Communication within the titan Flow Engine

Between ControlPeer and Grid Manager

The Grid Manager provides a REST API with the endpoint address/gridmanager/controlpeers. During its setup, the ControlPeer registers at address/gridmanager/controlpeers by posting "address:port" of its own REST interface for later communication with the Grid Manager. When a flow run state is to be changed, the Grid Manager sends a request to the ControlPeers Rest API to the endpoint /api/v1/flows/.

Between ControlPeer and Flow Manager

The Flow Manager provides a REST api with the endpoint address/flowmanager/flows//config In case the Grid Manager starts a flow, the ControlPeer requests the flow configuration from this endpoint using the flow name as provided by the Grid Manager.

Between ControlPeer and Package Manager

The Package Manager provides a REST API with the endpoint address/packagemanager/controlpeers. During its setup, the ControlPeer registers at address/packagemanager/controlpeers by posting "address:port" of its own REST interface for later communication with the Package Manager. After registering it requests the list of bricks at address/packagemanager/bricks . For installation of bricks, the Package Manager provides a REST API to download brick code zip archives. The ControlPeer fetches code of not yet installed bricks from the endpoint address/packagemanager/bricks/> and unpacks it locally. In turn, when a brick package is installed, the Package Manager sends the list of installed bricks to the ControlPeers Rest API to the endpoint /api/v1/bricks/install. During shutdown, the ControlPeer deregisters at the PackageManager by sending a delete request to address/packagemanager/controlpeers.

Between ControlPeer and BrickRunner


Once the ControlPeer is running it will parse the given flow configuration and create internal structures representing the configuration, it will then continue to start each flow, which means starting a BrickRunner process for each configured brick instance within the flow.

For the purpose of communicating with each other, the ControlPeer runs a TCP/IP server, waiting for incoming connections from a BrickRunner. The server address is passed as parameter to the BrickRunner process along with a unique ID.

Once the BrickRunner is running it will then open a connection to the ControlPeer and register itself using it's unique ID.


As soon as the registration is completed, the BrickRunner will ask for an assignment. The assignment includes which brick module to load/run, from which preceding BrickRunners input is to be delivered and which subsequent BrickRunners output will be requested (if any).

This is already enough information for the BrickRunner to start working: it will wait for input from preceding bricks or simply run the brick module if it's an inlet, and output packets will get passed on to the next BrickRunner in line.

In the communication between ControlPeer and BrickRunner there are currently two other types of message implemented: the scaling request, sand the termination request. The termination allows the ControlPeer to order the BrickRunner to terminate itself asap. The scaling request is described below in the section Autoscaling.

Between BrickRunners

BrickRunners connect to each other on their Input/Output: A BrickRunner (A) creates a server for it's Output. This server address is then passed onto the ControlPeer when requesting an assignment. A successor BrickRunner (B) will get this address as an input target in the answer to it's own assignment request. The B then opens a TCP/IP connection to A on this address.

Once the connection is established B will order A to deliver a batch of packets and then process incoming packets until that batch size is reached. And as soon as the number of unprocessed packets drops below the low queue limit a new batch of packets is requested.


If the criterion for triggering autoscaling of a successor brick is met (i.e. the maximum queue level is reached and more than one instance is allowed to run) the BrickRunner sends a scaling request message to the ControlPeer. The scaling request contains the name of the successor brick. If autoscaling was successful, the new BrickRunner instances sends a Scaling Message containing the output server address as a new input target to its successor BrickRunners.

Messaging and MessageTypes

On each connection a four byte large integer (Network Byte Order) gets sent first, announcing the size of the following message. The message itself is then sent as a binary representation of a two-item UjoList. The first item always identifies the type of the message, the second item is actual content, another UjoContainer.

The following MessageTypes exist:

# ControlPeer-BrickRunner:

    Register = 1
    AssignmentRequest = 2
    Assignment = 3
    Terminate = 4
    ScalingRequest = 5

# BrickRunner-BrickRunner:

    Scaling = 6
    Packet = 20
    PacketRequest = 21