How to write a Brick
A titan flow consists of a sequence of Bricks that process Flow Packets. You can write custom Bricks if the desired functionality is not available in any other Brick. In the following describes how to write your own Brick in Python.
How a Brick processes data
A Brick can be understood as a blackbox with data inputs a processing logic and data outputs. Depending on the needed functionality the input or output could be irrelevant for a given Brick.
The Brick can receive incoming Flow Packets on a single or multiple Input Ports. After processing incoming data it can emit the result as a packet to a single or multiple Output Ports.
You can write a Brick in two ways:
- as a Python file (e.g.
- as a Python module (e.g.
A Brick performing a simple task comes as a single Python script file (first option). The second way, providing a Python module, may be preferred in case the Brick implements a complex feature with several functions and submodules or in case you want to equip your Brick with parameters.
Installation of new Bricks to the titan platform
Custom bricks are clustered in brick packages and are installed to the titan platform in the UI using the Package Manager. A brick package is a tar, zip or tar.gz archive that contains a folder named by package name that includes the following files and folders:
package.yaml : A package description file
Name: "ExamplePackage" Author: "author name" Description: "concise package description"
If this file is missing the package name is set to the base name of the archive provided.
ExampleBrick2/ .. brick folders, named by the bricks they
Each brick folder must contain:
description.yaml the brick description file:
Name: ExampleBrick # the brick name Family: General # the brick family Parameters: # list of parameters - Name: ExampleParameter # name of the parameter Value: 42.2 # default value of the parameter Type: float64 # type of the parameter Description: "A concise description of the bricks functionality" Package: ExamplePackage # package name AutoscaleQueueLevel: 25 # number of unprocessed packets in the queue that trigger autoscaling AutoscaleMaxInstances: 1 # maximum number of allowed instances for autoscaling ExitAfterIdleSeconds: 10 # idle time in seconds that triggers automatic teardown Ports: SchemaFile: "ports.ujs" Input: IN : Input Output: OUT: Output
The Brick familiy is used in the UI to identify the symbol used for brick. Unsupported entries will result in the symbol of a general brick. If "Type" of a parameter is not set, the implied type will be string. Parameter values and autoscaling features, and the automatic teardown time can be reset by the brick user for each instance of a brick in the UI.
Note, the name of the brick code, module folder, respectively, must be identical to the brick name.
If your brick code needs modules that are not installed within the
titan platform you can provide a
requirements.txt that lists
these dependencies. Note, when ControlPeer and BrickRunner are run in a docker
container usage of packages that need to be compiled from sources
by gcc may not be supported.
In total a possible package structure is:
example/ example/package.yaml example/ExampleBrick1/ example/ExampleBrick1/examplebrick1.py example/ExampleBrick1/description.yaml example/ExampleBrick2/ example/ExampleBrick2/examplebrick2/ example/ExampleBrick2/examplebrick2/__init__.py example/ExampleBrick2/examplebrick2/examplebrick2.py example/ExampleBrick2/examplebrick2/someothercode.py example/ExampleBrick2/description.yaml
Brick code content requirements
The Brick content implements the logic you want and is executed by a Brick Runner. The Brick Runner handles the communication with the titan Flow Engine, i.e. delivers input data and receives and sends your result data. The Brick Runner uses one of two alternative entry points when executing a Brick:
- The Brick derives from the BrickBase base class
- The Brick contains a function do_brick_processing (deprecated)
The Brick derives from the base class BrickBase
You can write you Brick by creating a class called Brick that inherits from the class BrickBase which is part of the module brick.
from titanfe.brick import BrickBase class Brick(BrickBase): def __init__(self, adapter, parameters): super().__init__(adapter, parameters) def setup(self): '''your setup method''' def teardown(self): '''your teardown method''' def process(self, input: Type[UjoBase] = UJO_VARIANT_NONE): '''your Bricks Flow Packet processing logic'''
BrickBase has the attributes
adapter: interface to the Brick Runner Brick Adapter
parameters: optional dictionary of Brick parameters Note: Your Brick class must initialize the attributes of BrickBase during its initialization (see listing above).
Your Brick class must implement the following method of BrickBase:
process: The Bricks Flow Packet processing logic
process has one argument,
input, which represents the
incoming Flow Packet's payload and is of type UjoMap.
Your Brick class can implement the following methods of BrickBase:
setup: Upon loading the Brick in the BrickRunner the setup-method is run once and can be used to e.g. open connections that will be held persistent
teardown: When unloading the Brick from the BrickRunner the teardown-method is run once, implement it to e.g. close connections opened during
The Brick contains a function do_brick_processing
Instead of deriving a class Brick form Brick base, the Brick content script
can define a function called
do_brick_processing called by the Brick Runner.
This way of writing Bricks is deprecated and will not be supported in future versions.
def do_brick_processing(adapter, parameters, input): ... do something useful ...
do_brick_processing has three arguments:
adapter: interface to the Brick Runner
parameters: optional dictionary of Brick parameters
input: Incoming Flow Packet payload of type UjoMap
Brick Adapter (Brick Runner Interface)
The Brick Adapter provides an interface to the Brick Runner. It is an adapter class with the following attributes and methods:
meta: access to meta information of the current brick runner:
meta.flow.name | meta.flow.uid: the current flow's name as in the UI
/ unique ID in the titan platform
meta.brick.name | meta.brick.uid: the current brick's name as set in the UI
/ unique ID in the titan platform
The adapter implements the method,
emit_new_packet, that is used to send
Flow Packets to the output queue of the Brick Runner. The method can be
called one or multiple times and takes two arguments,
The mandatory argument
value is the output data of the Brick, which forms
the flow packets' payload and the optional argument,
port [default='A'] , is
the output port the flow packet is using.
Packet payloads need to be provided in Ujo format.
The adapter holds an instance of the class State which implements
get that allow to persist state and retrieve it
To set state within a brick use:
state denote the state you want to persist.
which returns the previously set state. A bricks state can be of any data type supported by the dataclasses_json package.
Return value of
If your Brick only sends one output Flow Packet, you can return its payload upon exiting the function. If you want to specify the output port used by the packet, you can do so by returning a tuple:
return (payload, port)
Output packet payloads need to be provided in Ujo format.
Parameters of a brick
Both ways of writing a Brick allow you to define parameters for you Brick that can be set from in the description.yaml or via the flow configuration. Parameters need to be provided as dictionaries, i.e. key: value pairs.
To emit log messages inside the brick use the
For further information on how to configure logging please refer
to How to configure logging.
To emit log messages the log package must be imported and the logger must
be initialized using
import titanfe.log log = titanfe.log.getLogger(__name__) class Brick(BrickBase): def __init__(self, adapter, parameters): # [...] def process(self, input): # [...] log.info('Your logged output')
The Brick you are going to write will most likely be an implementation of one of the following Brick families. Please consider which type your Brick is going to be, as corresponding images will be shown in the Graphical User Interface after the Brick is installed.
An Inlet sources data from the outside world into the flow. It will generate
one or more Flow Packets from an external source and sends them into the flow.
Inlets are the only type that ignores the
input, because Inlets are at the
beginning of the flows and won't get input data -
input will always be empty.
In contrast to the Inlet, the Outlet sources data from the flow to the outside world (e.g. writing the data into a database). Outlets do not send or return any further Flow Packets and therefore have no output ports.
A filter Brick typically forwards the input data to the output only on matching the condition of the filter.
A selector Brick typically forwards the input data to one of the multiple outputs (ports) depending of given conditions.
This Brick type is the most generic one. It typically transforms the input into one or more output Flow Packets.
Signal receiver is a producer starting a flow by receiving a trigger possibly including a complex data set delivered via the signal.
The consumer will end the current flow as a signal sender, sending out asynchronous messages to other flows.