stromx  0.8.0
Simple

Streams are the main objects of the runtime library. All other objects belong either directly or indirectly to a stream. A stream consists of operators each of which can have several input and output connectors. An example of a very simple stream is given in the following image:

simple.png

The operator Counter has the single output OUTPUT which is connected to the only input connector (fittingly called INPUT) of PeriodicDelay. This means that every output data which is produced at the output of PeriodicDelay will be transported to the input of Counter. Data which is transported to an input will stay there and block this input until its operator starts to process the data.

However operators in runtime are passive objects, i.e. they do not process any data on their own but need a thread which executes them. Besides operators, threads are the second important building block of a stream. Each input connector of an operator can be assigned to a thread. When activated the thread visits each of its input connector sequentially and starts over with the first when they last connector has been reached. When a thread visits an input connector of an operator it acts according to the following two steps:

In other words, the process of transporting data from output connectors to input connectors implicitely executes the operators of these connectors to make sure (1) that there is enough data at the outputs and (2) that the data at the inputs is removed in time.

Note
Not every input connector must be assigned to a thread. However, if data is required at an input which is not part of a thread the input must be fed manually by the user by calling runtime::Operator::setInputData(). Something similar holds for outputs which are not connected to inputs (more accurate, to inputs which are assigned to a thread). If data is waiting at such an output (which prevents the operator from being executed) it can be removed by the client using runtime::Operator::clearOutputData().

The simple tutorial assembles the stream in the image above and operates it. The Counter operator in the stream simply pushes an integer to its output each time it is executed. With each call it increments the integer by one. The PeriodicDelay operator simple passes data at its input to its output. However, it makes sure that new data appears at the output at most once within a certain amount of time (one second in this example). In this sense it acts as a timer, which delays the transport of data by predefined timespan. In the following, the source code of simple.cpp is printed with comments under each part.

#include <stromx/runtime/Counter.h>
#include <stromx/runtime/Operator.h>
#include <stromx/runtime/PeriodicDelay.h>
#include <stromx/runtime/ReadAccess.h>
#include <stromx/runtime/Stream.h>
#include <stromx/runtime/Thread.h>
#include <iostream>

As usual the program starts be including the required header files. Stromx uses a fine grained approach to the inclusion of headers, i.e. only the necessary headers are included by the library headers and forward declarations are used wherever possible. As a consequence the headers of all classes which are used in the code are included here.

The first four headers are part of the runtime library. This library provides all infrastructure to support streams, operators and threads but does not contain any actual operators. The operators which are used in the example are imported next. The final header is necessary to write the results of the running stream to the standard output.

using namespace stromx;
int main (int, char**)
{

The program starts by creating an empty stream object which will populated with operators and threads in the following.

runtime::Operator* source = stream.addOperator(new runtime::Counter);
stream.initializeOperator(source);
runtime::Operator* timer = stream.addOperator(new runtime::PeriodicDelay);
stream.initializeOperator(timer);

First, the source is operator kernel is allocated and added to the stream. When an operator is added to a stream the stream assumes ownership of it. I.e. its not advisable to add an operator kernel which is allocated on the stack to a stream. This would result in a crash when the stream is destroyed or when the operator goes out of scope. In other words, always use new to allocate an operator kernel before adding it to the stream and do not delete it because the stream will take care of that. Next, the operator is initialized. Initializing can possibly change important properties of an operator such as the number and type of its output and input connectors. For this reason, only initialized operators can be connected to each other.

timer->setParameter(runtime::PeriodicDelay::PERIOD, runtime::UInt32(1000));

In the next step the time delay of the timer operator is set to one second. All data passed as parameter to the function runtime::Operator::setParameter must be derived from runtime::Data. Versions of primitive data types which are derived from runtime::Data are defined in runtime::Primitive. Here runtime::UInt32 is used which obviously wraps a 32-bit unsigned integer.

Note
Each parameter has an access mode which controls when the associated value can be written and read. In case of the period of runtime::PeriodicCounter the access mode is runtime::Parameter::ACTIVATED_WRITE which means that the parameter can be written (and read) at any time as soon the operator is initialized.

stream.connect(source, runtime::Counter::OUTPUT, timer, runtime::PeriodicDelay::INPUT);

This line connects the output of the counter to the input of the timer operator. Note that input and output connectors are identified by the operator the belong to (source and timer) and their IDs (runtime::Counter::OUTPUT and runtime::PeriodicDelay::INPUT).

runtime::Thread* thread = stream.addThread();
thread->addInput(timer, runtime::PeriodicDelay::INPUT);

Now the operating thread is added to the stream. In the next line the input connector of timer is added as the only input connector which is operated by thread. As before the input connector is addressed by its operator and its ID.

stream.start();

This step starts all threads (i.e. is one in our case) of the stream. The thread begins to process its input connectors immediately. As mentioned in the introduction it executes the operators in the stream and moves processed data to the output of timer. The following loop gets this data from the final output and writes it to the standard input.

for(unsigned int i = 0; i < 5; ++i)
{
runtime::DataContainer data = timer->getOutputData(runtime::PeriodicDelay::OUTPUT);
runtime::ReadAccess count(data);
std::cout << "Received "
<< (unsigned int)(count.get<runtime::UInt32>())
<< std::endl;
timer->clearOutputData(runtime::PeriodicDelay::OUTPUT);
}

Each iteration of the loop starts by obtaining the current data at the output of timer. If data is present at the output function stream::Operator::getOutputData() returns it immediately, otherwise it blocks until data arrives. The data is packaged in a data container. Data containers are basically references to data objects and can be easily copied and therefore moved from one part of the stream to another. They are implemented via shared pointers and make sure the data inside them is deleted as soon nobody holds any referenece (i.e. no containers exist anymore) to it. As their real world counterparts data containers do not only faciliate the transport of data, they also protect it in the sense that they do not allow direct access to the data inside them.To actually read the data stored in the data container a read accessor count has to be constructed around the container. It allows reading data using the () operator as in the next line of the loop.

Note
As the existence of a read accessor suggests there is also a class runtime::WriteAccess which allows to write to data stored in a data container. While data can be read simultaneously by many threads, i.e. many runtime::ReadAccess objects can exist at the same time, only one write access can exist exclusively at a time. If a runtime::WriteAccess is constructed while other read or write access objects for the same data container exist the constructur blocks until all other accesses are released. If the write access is successfully allocated it is guaranteed to have exclusive write access rights. Other threads can only access the data after the write access goes out of scope or is deleted. This is can be convenient in situations where data should be rather shared than copied such as in case of simultaneous processing of large image data.

Finally the output of timer has be be cleared. Otherwise the data there would block counter from depositing new data and would eventually obstruct the complete stream.

stream.stop();
stream.join();
}

After the output of the stream has been received a couple of times the stream is ordered to stop. While runtime::Stream::stop() is an asynchronous method, joining the stream waits until each thread has come to halt and the stream is inactive.

In this tutorial a stream was assembled by calling respective library functions. However, it can be more convenient to write down the layout of the stream in an XML file and atomatically build it from this file. This is illustrated in the next tutorial File.