How to write a Brick
A titan flow consists of a sequence of Bricks that process Flow Packets. You can write custom Bricks if the desired functionality is not available in any other Brick. In the following describes how to write your own Brick in Python.
How a Brick processes data
A Brick can be understood as a blackbox with data inputs a processing logic and data outputs. Depending on the needed functionality the input or output could be irrelevant for a given Brick.
Ports
The Brick can receive incoming Flow Packets on a single or multiple Input Ports. After processing incoming data it can emit the result as a packet to a single or multiple Output Ports.
Brick implementation
You can write a Brick in two ways:
- as a Python file (e.g.
my_brick.py
) - as a Python module (e.g.
my_brick/*.py
)
A Brick performing a simple task comes as a single Python script file (first option). The second way, providing a Python module, may be preferred in case the Brick implements a complex feature with several functions and submodules or in case you want to equip your Brick with parameters.
Installation of new Bricks to the titan platform
Custom bricks are clustered in brick packages and are installed to the titan platform in the UI using the Package Manager. A brick package is a tar, zip or tar.gz archive that contains a folder named by package name that includes the following files and folders:
package.yaml
: A package description file
Name: "ExamplePackage"
Author: "author name"
Description: "concise package description"
If this file is missing the package name is set to the base name of the archive provided.
ExampleBrick1/
, ExampleBrick2/ ..
brick folders, named by the bricks they
contain.
Each brick folder must contain:
description.yaml
the brick description file:
Name: ExampleBrick # the brick name
Family: General # the brick family
Parameters: # list of parameters
- Name: ExampleParameter # name of the parameter
Value: 42.2 # default value of the parameter
Type: float64 # type of the parameter
Editor: Textarea # indicates the UI that the editor needs to be opened in a dialog box for editing
Constraints: # everything inside contraints is optional
Required: true # default is false
Validation: "^\d*\.?\d*$" # the Regex for validate parameter value and default is empty
Options: [] # by default it is empty, if array has data, in UI user can only select from the option's array for parameter value
Description: "A concise description of the bricks functionality"
Package: ExamplePackage # package name
AutoscaleQueueLevel: 25 # number of unprocessed packets in the queue that trigger autoscaling
AutoscaleMaxInstances: 1 # maximum number of allowed instances for autoscaling
ExitAfterIdleSeconds: 10 # idle time in seconds that triggers automatic teardown
Ports:
SchemaFile: "ports.ujs"
Input:
IN : Input
Output:
OUT: Output
The Brick familiy is used in the UI to identify the symbol used for brick. Unsupported entries will result in the symbol of a general brick. If "Type" of a parameter is not set, the implied type will be string. Parameter values and autoscaling features, and the automatic teardown time can be reset by the brick user for each instance of a brick in the UI.
Note, the name of the brick code, module folder, respectively, must be identical to the brick name.
If your brick code needs modules that are not installed within the
titan platform you can provide a requirements.txt
that lists
these dependencies. Note, when ControlPeer and BrickRunner are run in a docker
container usage of packages that need to be compiled from sources
by gcc may not be supported.
In total a possible package structure is:
example/
example/package.yaml
example/ExampleBrick1/
example/ExampleBrick1/examplebrick1.py
example/ExampleBrick1/description.yaml
example/ExampleBrick2/
example/ExampleBrick2/examplebrick2/
example/ExampleBrick2/examplebrick2/__init__.py
example/ExampleBrick2/examplebrick2/examplebrick2.py
example/ExampleBrick2/examplebrick2/someothercode.py
example/ExampleBrick2/description.yaml
Brick code content requirements
The Brick content implements the logic you want and is executed by a Brick Runner. The Brick Runner handles the communication with the titan Flow Engine, i.e. delivers input data and receives and sends your result data.
You can write your Brick by creating a class called Brick that inherits from the class BrickBase which is part of the module brick.
from titanfe.brick import BrickBase
class Brick(BrickBase):
def __init__(self, adapter, parameters):
super().__init__(adapter, parameters)
def setup(self):
'''your setup method'''
def teardown(self):
'''your teardown method'''
def process(self, input: Type[UjoBase], port: str):
'''your Bricks Flow Packet processing logic'''
BrickBase has the attributes
adapter
: interface to the Brick Runner Brick Adapterparameters
: optional dictionary of Brick parameters Note: Your Brick class must initialize the attributes of BrickBase during its initialization (see listing above).
Mandatory methods
Your Brick class must implement the following method of BrickBase:
process
: The Bricks Flow Packet processing logic
The function process
has two arguments,
input
, which represents the incoming Flow Packet's payload and
port
which is the port name on which the input was received.
Optional methods
Your Brick class can implement the following methods of BrickBase:
-
setup
: Upon loading the Brick in the BrickRunner the setup-method is run once and can be used to e.g. open connections that will be held persistent -
teardown
: When unloading the Brick from the BrickRunner the teardown-method is run once, implement it to e.g. close connections opened duringsetup
Brick Adapter (Brick Runner Interface)
The Brick Adapter provides an interface to the Brick Runner. It is an adapter class with the following attributes and methods:
Attributes
meta
: access to meta information of the current brick runner:
meta.flow.name | meta.flow.uid
: the current flow's name as in the UI
/ unique ID in the titan platform
meta.brick.name | meta.brick.uid
: the current brick's name as set in the UI
/ unique ID in the titan platform
Methods
The adapter implements the method, emit_new_packet
, that is used to send
Flow Packets to the output queue of the Brick Runner. The method can be
called one or multiple times and takes two arguments, value
and port
:
adapter.emit_new_packet(value, port)
The mandatory argument value
is the output data of the Brick, which forms
the flow packets' payload and the optional argument, port
[default='A'] , is
the output port the flow packet is using.
Packet payloads need to be provided in Ujo format.
Decrypting secret brick parameters
The adapter implements a method decrypt_parameter
that can be used to
decrypt string parameters of type secret (set in description.yaml).
adapter.decrypt_parameter(parameter_value)
Persisting/getting state
The adapter holds an instance of the class State which implements
the methods set
and get
that allow to persist state and retrieve it
when necessary.
To set state within a brick use:
adapter.state.set(state)
where state
denote the state you want to persist.
adapter.state.get()
which returns the previously set state. A bricks state can be of any data type supported by the dataclasses_json package.
Return value of process
and do_brick_processing
If your Brick only sends one output Flow Packet, you can return its payload upon exiting the function. If you want to specify the output port used by the packet, you can do so by returning a tuple:
return (payload, port)
Output packet payloads need to be provided in Ujo format.
Parameters of a brick
Both ways of writing a Brick allow you to define parameters for you Brick that can be set from in the description.yaml or via the flow configuration. Parameters need to be provided as dictionaries, i.e. key: value pairs.
Logging
To emit log messages inside the brick use the titanfe.bricks
logger.
For further information on how to configure logging please refer
to How to configure logging.
To emit log messages the log package must be imported and the logger must
be initialized using titanfe.log.getLogger
.
import titanfe.log
log = titanfe.log.getLogger(__name__)
class Brick(BrickBase):
def __init__(self, adapter, parameters):
# [...]
def process(self, input):
# [...]
log.info('Your logged output')
Brick families
The Brick you are going to write will most likely be an implementation of one of the following Brick families. Please consider which type your Brick is going to be, as corresponding images will be shown in the Graphical User Interface after the Brick is installed.
Inlet
An Inlet sources data from the outside world into the flow. It will generate
one or more Flow Packets from an external source and sends them into the flow.
Inlets are the only type that ignores the input
, because Inlets are at the
beginning of the flows and won't get input data - input
will always be empty.
Outlet
In contrast to the Inlet, the Outlet sources data from the flow to the outside world (e.g. writing the data into a database). Outlets do not send or return any further Flow Packets and therefore have no output ports.
Filter
A filter Brick typically forwards the input data to the output only on matching the condition of the filter.
Selector
A selector Brick typically forwards the input data to one of the multiple outputs (ports) depending of given conditions.
General
This Brick type is the most generic one. It typically transforms the input into one or more output Flow Packets.
SignalIn
Signal receiver is a producer starting a flow by receiving a trigger possibly including a complex data set delivered via the signal.
SignalOut
The consumer will end the current flow as a signal sender, sending out asynchronous messages to other flows.