How to write a Brick
A titan flow consists of a sequence of Bricks that process Flow Packets. You can write custom Bricks if the desired functionality is not available in any other Brick. In the following describes how to write your own Brick in Python.
How a Brick processes data
A Brick can be understood as a blackbox with data inputs a processing logic and data outputs. Depending on the needed functionality the input or output could be irrelevant for a given Brick.
Ports
The Brick can receive incoming Flow Packets on a single or multiple Input Ports. After processing incoming data it can emit the result as a packet to a single or multiple Output Ports.
Brick implementation
You can write a Brick in two ways:
- as a Python file (e.g.
my_brick.py
) - as a Python module (e.g.
my_brick/*.py
)
A Brick performing a simple task comes as a single Python script file (first option). The second way, providing a Python module, may be preferred in case the Brick implements a complex feature with several functions and submodules. In that case, the folder containing the code has to be named by the bricks name in lower case. Furthermore, it has to contain a file __init__.py that makes the Brick class public, e.g. if a file someFile.py contains it:
from .someFile import Brick
__all__ = ["Brick"]
Installation of new Bricks to the titan platform
Custom bricks are clustered in brick packages and are installed to the titan platform in the UI using the Package Manager. A brick package is a tar, zip or tar.gz archive that contains a folder named by package name that includes the following files and folders:
package.yaml
: A package description file
Description: "concise package description"
Package: "author name space:package name:package version" # a package URN
Author : "the package author"
Name: "package name" # identical to the one in the URN
The package version in the URN should follow semantic versioning. When a package with an identical URN is uploaded the existing package gets overwritten.
ExampleBrick1/
, ExampleBrick2/ ..
brick folders, named by the bricks they
contain.
Each brick folder must contain:
description.yaml
the brick description file:
Name: ExampleBrick # the brick name
Family: General # the brick family
Parameters: # list of parameters
- Name: ExampleParameter # name of the parameter
Value: 42.2 # default value of the parameter
Type: float64 # type of the parameter
Editor: Textarea # indicates the UI that the editor needs to be opened in a dialog box for editing
Constraints: # everything inside contraints is optional
Required: true # default is false
Validation: "^\d*\.?\d*$" # the Regex for validate parameter value and default is empty
Options: [] # by default it is empty, if array has data, in UI user can only select from the option's array for parameter value
Description: "A concise description of the bricks functionality"
AutoscaleQueueLevel: 25 # number of unprocessed packets in the queue that trigger autoscaling
AutoscaleMaxInstances: 1 # maximum number of allowed instances for autoscaling
ExitAfterIdleSeconds: 10 # idle time in seconds that triggers automatic teardown
Ports:
SchemaFile: "ports.ujs"
Input:
IN : Input
Output:
OUT: Output
The Brick familiy is used in the UI to identify the symbol used for brick. Unsupported entries will result in the symbol of a general brick. If "Type" of a parameter is not set, the implied type will be string. Parameter values and autoscaling features, and the automatic teardown time can be reset by the brick user for each instance of a brick in the UI.
Note, the name of the brick code, module folder, respectively, must be identical to the brick name.
If your brick code needs modules that are not installed within the
titan platform you can provide a requirements.txt
that lists
these dependencies. Note, when ControlPeer and BrickRunner are run in a docker
container usage of packages that need to be compiled from sources
by gcc may not be supported.
In total a possible package structure is:
example/
example/package.yaml
example/ExampleBrick1/
example/ExampleBrick1/examplebrick1.py
example/ExampleBrick1/description.yaml
example/ExampleBrick2/
example/ExampleBrick2/examplebrick2/
example/ExampleBrick2/examplebrick2/__init__.py
example/ExampleBrick2/examplebrick2/examplebrick2.py
example/ExampleBrick2/examplebrick2/someothercode.py
example/ExampleBrick2/description.yaml
Brick and package images
You can provide a custom images for your brick and packages that to visualize it in the UI. They have to be provided as scalable vector graphics and named by the package or brick name respectively, e.g.
example/
example/package.yaml
example/example.svg
example/ExampleBrick1/
example/ExampleBrick1/examplebrick1.py
example/ExampleBrick1/description.yaml
example/ExampleBrick1/examplebrick1.svg
Visualization of brick parameters in the UI
Secrets
Parameters of type secret
are automatically encrypted by the platform and
have to be decrypted using the brick adapter. In the UI they are hidden:
Textarea editor
For string parameters that are expected to be filled with a
longer text one can choose to visualize them in a textarea instead
of a simple textfield by setting the editor type to textarea:
Parameters:
- Name : Template
Type : string
Editor: Textarea
StringList editor
String lists can be represented by parameters of type string
choosing the StringList
editor:
Parameters:
- Name : NodeIds
Type : string
Editor: StringList
In the brick code the list can be split to its elements using the separator '|', here:
elements = parameter['NodeIds'].split('|')
Brick code content requirements
The Brick content implements the logic you want and is executed by a Brick Runner. The Brick Runner handles the communication with the titan Flow Engine, i.e. delivers input data and receives and sends your result data.
You can write your Brick by creating a class called Brick that inherits from the class BrickBase which is part of the module brick.
from titanfe.brick import BrickBase
class Brick(BrickBase):
def __init__(self, adapter, parameters):
super().__init__(adapter, parameters)
def setup(self):
'''your setup method'''
def teardown(self):
'''your teardown method'''
def process(self, input: Type[UjoBase], port: str):
'''your Bricks Flow Packet processing logic'''
Bricks of type Inlet should inherit from the class InletBrickBase, instead.
from titanfe.brick import InletBrickBase
class Brick(InletBrickBase):
def __init__(self, adapter, parameters):
super().__init__(adapter, parameters)
def setup(self):
'''your setup method'''
def stop_processing(self):
'''your method to stop the inlet from running'''
def process(self, input: Type[UjoBase], port: str):
'''your Bricks Flow Packet processing logic'''
BrickBase, InletBrickBase have the attributes
adapter
: interface to the Brick Runner Brick Adapterparameters
: optional dictionary of Brick parameters Note: Your Brick class must initialize the attributes of BrickBase during its initialization (see listing above).
Mandatory methods
Your Brick class must implement the following method of BrickBase:
process
: The Bricks Flow Packet processing logic
Your Brick class for an Inlet must implement the following method of InletBrickBase:
process
: The Bricks Flow Packet processing logicstop_processing
: A method to make the Inlet stop running
The function process
has two arguments,
input
, which represents the incoming Flow Packet's payload and
port
which is the port name on which the input was received.
Optional methods
Your Brick class can implement the following methods of BrickBase/InletBrickBase:
-
setup
: Upon loading the Brick in the BrickRunner the setup-method is run once and can be used to e.g. open connections that will be held persistent -
teardown
: When unloading the Brick from the BrickRunner the teardown-method is run once, implement it to e.g. close connections opened duringsetup
Brick Adapter (Brick Runner Interface)
The Brick Adapter provides an interface to the Brick Runner. It is an adapter class with the following attributes and methods:
Attributes
meta
: access to meta information of the current brick runner:
meta.flow.name | meta.flow.uid
: the current flow's name as in the UI
/ unique ID in the titan platform
meta.brick.name | meta.brick.uid
: the current brick's name as set in the UI
/ unique ID in the titan platform
Methods
The adapter implements the method, emit_new_packet
, that is used to send
Flow Packets to the output queue of the Brick Runner. The method can be
called one or multiple times and takes two arguments, value
and port
:
adapter.emit_new_packet(value, port)
The mandatory argument value
is the output data of the Brick, which forms
the flow packets' payload and the optional argument, port
[default='A'] , is
the output port the flow packet is using.
Packet payloads need to be provided in Ujo format.
Decrypting secret brick parameters
The adapter implements a method decrypt_parameter
that can be used to
decrypt string parameters of type secret (set in description.yaml).
adapter.decrypt_parameter(parameter_value)
Persisting/getting state
The adapter holds an instance of the class State which implements
the methods set
and get
that allow to persist state and retrieve it
when necessary.
To set state within a brick use:
adapter.state.set(state)
where state
denote the state you want to persist.
adapter.state.get()
which returns the previously set state. A bricks state can be of any data type supported by the dataclasses_json package.
Return value of process
If your Brick only sends one output Flow Packet, you can return its payload upon exiting the function. If you want to specify the output port used by the packet, you can do so by returning a tuple:
return (payload, port)
Output packet payloads need to be provided in Ujo format.
Parameters of a brick
Both ways of writing a Brick allow you to define parameters for you Brick that can be set from in the description.yaml or via the flow configuration. Parameters need to be provided as dictionaries, i.e. key: value pairs.
Logging
To emit log messages inside the brick use the titanfe.bricks
logger.
For further information on how to configure logging please refer
to How to configure logging.
To emit log messages the log package must be imported and the logger must
be initialized using titanfe.log.getLogger
.
import titanfe.log
log = titanfe.log.getLogger(__name__)
class Brick(BrickBase):
def __init__(self, adapter, parameters):
# [...]
def process(self, input):
# [...]
log.info('Your logged output')
Testing
Any local installation of the Flowengine-py (e.g. pip install titanfe
)
provides a mock of the BrickRunner, the TestRunner. It allows you to test
your bricks' returned flow packets in response to given input
packets by using it in your unit tests, e.g.:
from titanfe.testing import TestRunner
runner = TestRunner(Brick, parameters={...})
runner.start()
runner.input.put(UjoInt32(42))
port, output = runner.output.get()
runner.stop()
assert port == ... # some expected value for port
assert output == ... # some expected value for output
or simpler:
from titanfe.testing import TestRunner
with TestRunner(Brick, parameters={...}) as runner:
runner.input.put(UjoInt32(42))
port, output = runner.output.get()
assert port == ... # some expected value for port
assert output == ... # some expected value for output
Note, the input packet and the returned output are given in UJO.
Brick families
The Brick you are going to write will most likely be an implementation of one of the following Brick families. Please consider which type your Brick is going to be, as corresponding images will be shown in the Graphical User Interface after the Brick is installed.
Inlet
An Inlet sources data from the outside world into the flow. It will generate
one or more Flow Packets from an external source and sends them into the flow.
Inlets are the only type that ignores the input
, because Inlets are at the
beginning of the flows and won't get input data - input
will always be empty.
Outlet
In contrast to the Inlet, the Outlet sources data from the flow to the outside world (e.g. writing the data into a database). Outlets do not send or return any further Flow Packets and therefore have no output ports.
Filter
A filter Brick typically forwards the input data to the output only on matching the condition of the filter.
Selector
A selector Brick typically forwards the input data to one of the multiple outputs (ports) depending of given conditions.
General
This Brick type is the most generic one. It typically transforms the input into one or more output Flow Packets.
SignalIn
Signal receiver is a producer starting a flow by receiving a trigger possibly including a complex data set delivered via the signal.
SignalOut
The consumer will end the current flow as a signal sender, sending out asynchronous messages to other flows.