How to write a Brick

A titan flow consists of a sequence of Bricks that process Flow Packets. You can write custom Bricks if the desired functionality is not available in any other Brick. In the following describes how to write your own Brick in Python.

How a Brick processes data

A Brick can be understood as a blackbox with data inputs a processing logic and data outputs. Depending on the needed functionality the input or output could be irrelevant for a given Brick.

Ports

The Brick can receive incoming Flow Packets on a single or multiple Input Ports. After processing incoming data it can emit the result as a packet to a single or multiple Output Ports.

Brick implementation

You can write a Brick in two ways:

  1. as a Python file (e.g. my_brick.py)
  2. as a Python module (e.g. my_brick/*.py )

A Brick performing a simple task comes as a single Python script file (first option). The second way, providing a Python module, may be preferred in case the Brick implements a complex feature with several functions and submodules or in case you want to equip your Brick with parameters.

Installation of new Bricks to the titan platform

Custom bricks are clustered in brick packages and are installed to the titan platform in the UI using the Package Manager. A brick package is a tar, zip or tar.gz archive that contains a folder named by package name that includes the following files and folders:

package.yaml : A package description file

Name: "ExamplePackage"
Author: "author name"
Description: "concise package description"

If this file is missing the package name is set to the base name of the archive provided.

ExampleBrick1/, ExampleBrick2/ .. brick folders, named by the bricks they contain.

Each brick folder must contain:

description.yaml the brick description file:

Name: ExampleBrick # the brick name
Family: General   # the brick family
Parameters:      # list of parameters
  - Name: ExampleParameter  # name of the parameter
    Value: 42.2             # default value of the parameter
    Type: float64          # type of the parameter
Description: "A concise description of the bricks functionality"
Package: ExamplePackage   # package name
AutoscaleQueueLevel: 25   # number of unprocessed packets in the queue that trigger autoscaling
AutoscaleMaxInstances: 1  # maximum number of allowed instances for autoscaling
ExitAfterIdleSeconds: 10  # idle time in seconds that triggers automatic teardown
Ports:
  SchemaFile: "ports.ujs"
  Input:
    IN : Input
  Output:
    OUT: Output

The Brick familiy is used in the UI to identify the symbol used for brick. Unsupported entries will result in the symbol of a general brick. If "Type" of a parameter is not set, the implied type will be string. Parameter values and autoscaling features, and the automatic teardown time can be reset by the brick user for each instance of a brick in the UI.

Note, the name of the brick code, module folder, respectively, must be identical to the brick name.

If your brick code needs modules that are not installed within the titan platform you can provide a requirements.txt that lists these dependencies. Note, when ControlPeer and BrickRunner are run in a docker container usage of packages that need to be compiled from sources by gcc may not be supported.

In total a possible package structure is:

example/
example/package.yaml
example/ExampleBrick1/
example/ExampleBrick1/examplebrick1.py
example/ExampleBrick1/description.yaml
example/ExampleBrick2/
example/ExampleBrick2/examplebrick2/
example/ExampleBrick2/examplebrick2/__init__.py
example/ExampleBrick2/examplebrick2/examplebrick2.py
example/ExampleBrick2/examplebrick2/someothercode.py
example/ExampleBrick2/description.yaml

Brick code content requirements

The Brick content implements the logic you want and is executed by a Brick Runner. The Brick Runner handles the communication with the titan Flow Engine, i.e. delivers input data and receives and sends your result data. The Brick Runner uses one of two alternative entry points when executing a Brick:

  1. The Brick derives from the BrickBase base class
  2. The Brick contains a function do_brick_processing (deprecated)

The Brick derives from the base class BrickBase

You can write you Brick by creating a class called Brick that inherits from the class BrickBase which is part of the module brick.

from titanfe.brick import BrickBase

class Brick(BrickBase):
    def __init__(self, adapter, parameters):
        super().__init__(adapter, parameters)

    def setup(self):
       '''your setup method'''

    def teardown(self):
       '''your teardown method'''

    def process(self, input: Type[UjoBase] = UJO_VARIANT_NONE):
       '''your Bricks Flow Packet processing logic'''

BrickBase has the attributes

  • adapter : interface to the Brick Runner Brick Adapter
  • parameters : optional dictionary of Brick parameters Note: Your Brick class must initialize the attributes of BrickBase during its initialization (see listing above).

Mandatory methods

Your Brick class must implement the following method of BrickBase:

  • process : The Bricks Flow Packet processing logic

The function process has one argument, input, which represents the incoming Flow Packet's payload and is of type UjoMap.

Optional methods

Your Brick class can implement the following methods of BrickBase:

  • setup : Upon loading the Brick in the BrickRunner the setup-method is run once and can be used to e.g. open connections that will be held persistent

  • teardown : When unloading the Brick from the BrickRunner the teardown-method is run once, implement it to e.g. close connections opened during setup

The Brick contains a function do_brick_processing

Instead of deriving a class Brick form Brick base, the Brick content script can define a function called do_brick_processing called by the Brick Runner. This way of writing Bricks is deprecated and will not be supported in future versions.

    def do_brick_processing(adapter, parameters, input):
        ... do something useful ...

The function do_brick_processing has three arguments:

  1. adapter : interface to the Brick Runner
  2. parameters : optional dictionary of Brick parameters
  3. input : Incoming Flow Packet payload of type UjoMap

Brick Adapter (Brick Runner Interface)

The Brick Adapter provides an interface to the Brick Runner. It is an adapter class with the following attributes and methods:

Attributes

  • meta: access to meta information of the current brick runner:

meta.flow.name | meta.flow.uid: the current flow's name as in the UI / unique ID in the titan platform

meta.brick.name | meta.brick.uid: the current brick's name as set in the UI / unique ID in the titan platform

Methods

The adapter implements the method, emit_new_packet, that is used to send Flow Packets to the output queue of the Brick Runner. The method can be called one or multiple times and takes two arguments, value and port:

      adapter.emit_new_packet(value, port)

The mandatory argument value is the output data of the Brick, which forms the flow packets' payload and the optional argument, port [default='A'] , is the output port the flow packet is using. Packet payloads need to be provided in Ujo format.

Persisting/getting state

The adapter holds an instance of the class State which implements the methods set and get that allow to persist state and retrieve it when necessary.

To set state within a brick use:

      adapter.state.set(state)

where state denote the state you want to persist.

      adapter.state.get()

which returns the previously set state. A bricks state can be of any data type supported by the dataclasses_json package.

Return value of process and do_brick_processing

If your Brick only sends one output Flow Packet, you can return its payload upon exiting the function. If you want to specify the output port used by the packet, you can do so by returning a tuple:

   return (payload, port)

Output packet payloads need to be provided in Ujo format.

Parameters of a brick

Both ways of writing a Brick allow you to define parameters for you Brick that can be set from in the description.yaml or via the flow configuration. Parameters need to be provided as dictionaries, i.e. key: value pairs.

Logging

To emit log messages inside the brick use the titanfe.bricks logger. For further information on how to configure logging please refer to How to configure logging.

To emit log messages the log package must be imported and the logger must be initialized using titanfe.log.getLogger.

import titanfe.log

log = titanfe.log.getLogger(__name__)

class Brick(BrickBase):
    def __init__(self, adapter, parameters):
      # [...]

    def process(self, input):
      # [...]
      log.info('Your logged output')

Brick families

The Brick you are going to write will most likely be an implementation of one of the following Brick families. Please consider which type your Brick is going to be, as corresponding images will be shown in the Graphical User Interface after the Brick is installed.

Inlet

An Inlet sources data from the outside world into the flow. It will generate one or more Flow Packets from an external source and sends them into the flow. Inlets are the only type that ignores the input, because Inlets are at the beginning of the flows and won't get input data - input will always be empty.

inlet

Outlet

In contrast to the Inlet, the Outlet sources data from the flow to the outside world (e.g. writing the data into a database). Outlets do not send or return any further Flow Packets and therefore have no output ports.

outlet

Filter

A filter Brick typically forwards the input data to the output only on matching the condition of the filter.

filter

Selector

A selector Brick typically forwards the input data to one of the multiple outputs (ports) depending of given conditions.

General

This Brick type is the most generic one. It typically transforms the input into one or more output Flow Packets.

general

SignalIn

Signal receiver is a producer starting a flow by receiving a trigger possibly including a complex data set delivered via the signal.

signalIn

SignalOut

The consumer will end the current flow as a signal sender, sending out asynchronous messages to other flows.

signalIn