stromx  0.8.0
Operator

In this example the stream in previous examples shall be extended by an operator Add which adds a constant offset to the number currently processed:

operator.png

However such an operator is not defined in the package Example which provided all operators which have been used so far. Thus, a custom operator Add must be implemented. It will be the single member of the new package Math.

Declaring the operator

A custom operator is implemented by defining an operator kernel. In contrast to operators, operator kernels do not have to be thread-safe. All concurrent accesses are handled by the runtime::Operator class which wraps the operator kernels. This simplifies the definition of custom operators.

The new operator kernel is declared in the header file Add.h:

#ifndef MATH_ADD_H
#define MATH_ADD_H
#include <stromx/runtime/OperatorKernel.h>
#include <stromx/runtime/DataContainer.h>
#include <stromx/runtime/Primitive.h>
namespace math
{
{

The kernel will be a member of the package Math. It seems to be a good practice to add all operators of a package to a common namespace which in this case is consequently called math.

All operator kernels are derived from runtime::OperatorKernel.

public:
enum DataId
{
INPUT,
OUTPUT,
OFFSET
};

Each input, output and parameter of the operator is identified by an ID. Here each of these IDs is 0, but any other value is possible as long as it is unique within its category (i.e. inputs, outputs, parameters).

Add();
virtual OperatorKernel* clone() const { return new Add; }
virtual void execute(stromx::runtime::DataProvider& provider);
virtual void setParameter(const unsigned int id, const stromx::runtime::Data& value);
virtual const stromx::runtime::DataRef getParameter(const unsigned int id) const;

First the default constructor is declared. The functions runtime::OperatorKernel::clone() and runtime::OperatorKernel::execute() are abstract in runtime::OperatorKernel and must be defined in each custom operator kernel. The subsequent functions must be implemented to support reading and writing the paramter of Add.

private:
static const std::vector<const stromx::runtime::Input*> setupInputs();
static const std::vector<const stromx::runtime::Output*> setupOutputs();
static const std::vector<const stromx::runtime::Parameter*> setupParameters();
static const std::string TYPE;
static const std::string PACKAGE;
static const stromx::runtime::Version VERSION;
};
}
#endif // MATH_ADD_H

The subsequent private declarations are for internal use only and will be explained when used in the implementation of Add.

Implementing the operator

The operatorar Add is implemented in Add.cpp.

#include "Add.h"
#include <stromx/runtime/Primitive.h>
#include <stromx/runtime/OperatorException.h>
#include <stromx/runtime/DataProvider.h>
#include <stromx/runtime/Id2DataPair.h>
#include <stromx/runtime/Id2DataComposite.h>
#include <stromx/runtime/ReadAccess.h>

First all necessary headers are included. The subsequent using allows to access the members of the runtime library without the prefix runtime::

namespace math
{
const std::string Add::TYPE("Add");
const std::string Add::PACKAGE("Math");
const runtime::Version Add::VERSION(1, 0, 0);

Each operator kernel provides exports its type, the package it belongs to and its version. This information must be the same for each instance of specific operator class and is naturally stored in static variables. This data will be passed to the constructor of runtime::OperatorKernel.

Moreover, the constructor of runtime::OperatorKernel accepts vectors of descriptions of the inputs, outputs and parameters of the operator kernel. These descriptions are not required to be the same for each instance of a specific operator class and can change during the lifetime of operator objects. As a consequence they can not be static variables but must be allocated for each class instance. The following helper functions take care of this allocations. In the case of Add the initial descriptions are the same for each class instance. Thus, the initialization functions can be static.

const std::vector<const runtime::Input*> Add::setupInputs()
{
std::vector<const runtime::Input*> inputs;
runtime::Input* input = new runtime::Input(INPUT, runtime::Variant::UINT_32);
input->setTitle("Input");
inputs.push_back(input);
return inputs;
}
const std::vector<const runtime::Output*> Add::setupOutputs()
{
std::vector<const runtime::Output*> outputs;
runtime::Output* output = new runtime::Output(OUTPUT, runtime::Variant::UINT_32);
output->setTitle("Output");
outputs.push_back(output);
return outputs;
}
const std::vector<const runtime::Parameter*> Add::setupParameters()
{
std::vector<const runtime::Parameter*> parameters;
runtime::Parameter* offset = new runtime::Parameter(OFFSET, runtime::Variant::UINT_32);
offset->setTitle("Offset");
offset->setAccessMode(runtime::Parameter::ACTIVATED_WRITE);
parameters.push_back(offset);
return parameters;
}

Each initialization function allocates an empty vector which contains pointers to runtime::Input or runtime::Output objects (in the case of inputs and outputs) or runtime::Parameter objects (for parameters). Input and output descriptions only contain their ID, data variants and names. The data variant defines which kind of data is accepted by the input or returned at the output, respectively. In case of parameters it determines the type of the data stored for this parameter.

Parameters additionally provide information about their access mode. Here runtime::Parameter::ACTIVATED_WRITE means that the parameter can be read and written while the operator is initialized or activated. There exist subclasses of runtime::Parameter to describe specific types of parameters in more detail. One example is runtime::NumericParameter which contains information about the minimal and maximal value a parameter can be set to.

Add::Add()
: OperatorKernel(TYPE, PACKAGE, VERSION, setupInputs(), setupOutputs(), setupParameters())
{
}

The constructor of Add simply passes all information which was setup above to the constructor of operator kernel.

Note
Not all inputs, outputs and parameters have to be passed to the constructor of runtime::OperatorKernel. There exists the possibility to add further information during initialization of the operator (by overloading runtime::OperatorKernel::initialize()). This means that an operator can be set up in a two stage process. After instantiation (but before initialization) some parameter descriptions are allocated and the corresponding parameters can be set. Depending on the value of these parameters the operators assigns further inputs, outputs and parameters to itself during initialization. An example is an operator which merges several inputs to one output. After instantation the operator has no inputs but a parameter which defines the number of inputs. Only during initialization these inputs are actually allocated.

void Add::setParameter(unsigned int id, const runtime::Data& value)
{
try
{
switch(id)
{
case OFFSET:
m_offset = runtime::data_cast<runtime::UInt32>(value);
break;
default:
throw runtime::WrongParameterId(id, *this);
}
}
catch(runtime::BadCast&)
{
throw runtime::WrongParameterType(parameter(id), *this);
}
}

The parameter Add supports only one parameter. To support more parameter further case statements have to be added to the switch statement. The value of the parameter OFFSET is internally stored as member of type runtime::UInt32. Because the parameter value is of type runtime::Data it must be down-casted to the correct type. The function runtime::data_cast() works exactly as dynamic_cast() but throws a runtime::BadCast instead of std::bad_cast.

const runtime::DataRef Add::getParameter(const unsigned int id) const
{
switch(id)
{
case OFFSET:
return m_offset;
default:
throw runtime::WrongParameterId(id, *this);
}
}

The functions to read a parameter follows the same pattern as writing it.

Note
As mentioned above operator kernels do not have to be thread-safe. It is guaranteed that runtime::OperatorKernel::getParameter() and runtime::OperatorKernel::setParameter() are never called during execution of the operator.

void Add::execute(runtime::DataProvider& provider)
{
runtime::Id2DataPair inputMapper(INPUT);
provider.receiveInputData(inputMapper);

The actual work done by Add happens in execute(). There the input data is processed and the result is passed to the operator output. The input data is obtained from a runtime::DataProvider object. To request the data of an specific input a runtime::Id2DataPair object inputMapper is instantiated with the ID of the input in question. The data provider looks up the corresponding input and waits for input data. If the data arrives it stores in inputMapper.

Note
The above procedure to obtain the input data of the operator via data mappers might appear complicated at first sight. However, this approach allows for more flexible strategies to get input data. Consider e.g. an operator which has two inputs but requires only data from one of them at a time. If inputMapper1 and inputMapper2 map to the respective inputs, the call provider.receiveInputData(inputMapper1 || inputMapper2) returns as soon as one of the inputs received some data. On the other hand, provider.receiveInputData(inputMapper1 && inputMapper2) waits until both inputs received their data. More complicated expressions of data mappers can be built by combining runtime::operator&&, runtime::operator|| and runtime::Try.

runtime::ReadAccess input(inputMapper.data());
unsigned int sum = (unsigned int)(input.get<runtime::UInt32>()) +
(unsigned int)(m_offset);
runtime::Data* result = new runtime::UInt32(sum);

The member runtime::Id2DataPair::data() returns a runtime::DataContainer object. As in the previous examples a runtime::ReadAccess is needed to extract the actual data from the data container. In the next step the output value is computed and stored in an runtime::UInt32 object. The output data is passed to subsequent operators or to the client of the library without copying the data. I.e. the object must be allocated on the heap because it should stay alive even if it leaves the scope of Add::execute().

runtime::DataContainer outContainer(result);
runtime::Id2DataPair output(OUTPUT, outContainer);
provider.sendOutputData(output);
}
}

To send the result data to the output it is packed into a DataContainer and paired with the ID of the respective output. The data provider is responsible to send the data container to the corresponding output. As in the case of runtime::DataProvider::receiveInputData(), the function runtime::DataProvider::sendOutputData() accepts expressions runtime::Id2DataPair objects together with runtime::operator&&, runtime::operator|| and runtime::Try.

Deploying the operator

Finally, the new operator should be used as a part of a stream. This is simply done by adapting the file which defines the layout of the stream.

<?xml version="1.0" encoding="UTF-8" ?>
<Stromx version="0.1.0">
<Stream>
<Operator id="0" package="runtime" type="Counter" version="0.1.0"/>
<Operator id="1" package="Math" type="Add" version="1.0.0">
<Parameter id="2">
<Data type="UInt32" package="runtime" version="0.1.0">2</Data>
</Parameter>
<Input id="0" operator="0" output="0"/>
</Operator>
<Operator id="2" package="runtime" type="PeriodicDelay" version="0.1.0">
<Parameter id="2">
<Data type="UInt32" package="runtime" version="0.1.0">1000</Data>
</Parameter>
<Input id="0" operator="1" output="1"/>
</Operator>
<Thread name="">
<InputConnector operator="1" input="0"/>
<InputConnector operator="2" input="0"/>
</Thread>
</Stream>
</Stromx>

The program which runs the stream is almost identical to the one in the previous tutorial. The only extra step which has to be taken is the registration of the operator Add with the factory.

To register a single operator one object of its class has to be instantiated and passed to the factory.

Note
If the package Math contained more operators and was built as separate shared object it would make sense to define a function registerMath() in the shared object which registered all operators in Math.

Using the presented technique allows to implement single steps of a data processing pipeline in a stromx operator. These operators can then be combined arbitrarily to form new processing networks.

It is possible to define custom data types in a similar fashion by subclassing runtime::Data and runtime::VariantInterface.

The tutorial Camera how to process images orginating from a simulated camera.