Extending worker

Warning

The API in the following section is not yet fully stable. It may be changed in the near future.

HyperLoom infrastructure offers by default a set of operations for basic manipulation with data objects and running and external programs. One of this task is also task loom/py_call (it can be used via tasks.py_call or tasks.py_task in Python client). This task allows to executed arbitrary Python codes and the user may define new tasks.

The another way is to directly extend a worker itself. The primary purpose is efficiency, since worker extensions can be written in C++. Moreover, this approach is more powerfull than py_call, since not only tasks but also new data objects may be introduced.

On the implementation level, HyperLoom contains a C++ library libloom that implements the worker in an extensible way.

New tasks

Let us assume that we want to implement a task that returns a number of a specified characters in a D-object. First, we define the code of the task itself:

 #include "libloom/threadjob.h"

 class CountJob : public loom::ThreadJob
 {
 public:
    using ThreadJob::ThreadJob;

    std::shared_ptr<loom::Data> run() {
        // Verify inputs and configuration
        if (inputs.size() != 1 || task.config.size() != 1) {
             set_error("Invalid use of the task");
             return nullptr;
        }
        char c = task.config[0]; // Get first character of config

        if (!inputs[0].has_raw_data()) {
             set_error("Input object does not contain raw data");
             return nullptr;
        }

        // Get pointer to raw data
        const char *mem = inputs[0].get_raw_data();

        // Perform the computation
        size_t size = inputs[0].get_size();
        uint64_t count = 0;
        for (size_t i = 0; i < size;i ++) {
             if (mem[i] == c) {
                  count += 1;
             }
        }

        // Create result
        auto output = std::make_shared<RawData>();
        output->init_from_mem(work_dir, &count, sizeof(count));
        return std::static_pointer_cast<Data>(output);
    }
};

loom::ThreadJob serves for defining a tasks that are executed in its own thread. The subclass has to implement run() method that is executed when the task is fired. It should return data object or nullptr when an error occurs.

The following code defines main function for the modified worker. It is actually the same code as for the worker distributed with HyperLoom except the registartion of our new task. Each task has to be registered under a symbol. Symbols for buildin tasks, data objects and resource requests starts with prefix loom/. To avoid name clashes, it is good practice to introduce new prefix, in our example, it is prefix my/.

#include "libloom/worker.h"
#include "libloom/log.h"
#include "libloom/config.h"

#include <memory>

using namespace loom;

int main(int argc, char **argv)
{
    /* Create a configuration and parse args */
    Config config;
    config.parse_args(argc, argv);

    /* Init libuv */
    uv_loop_t loop;
    uv_loop_init(&loop);

    /* Create worker */
    loom::Worker worker(&loop, config);
    worker.register_basic_tasks();

    /* --> Registration of our task <-- */
    worker.add_task_factory<ThreadTaskInstance<CountJob>>("my/count");

    /* Start loop */
    uv_run(&loop, UV_RUN_DEFAULT);
    uv_loop_close(&loop);
    return 0;
}

New data objects

TODO