The Fluxtion compiler supports functional construction of event processing logic, this allows developers to bind
functions into the processor without having to construct classes marked with Fluxtion annotations. The goal of using the
functional DSL is to have no Fluxtion api calls in the business logic only vanilla java.
This section describes the Functional DSL in greater depth than the Fluxtion DSL
exploring concepts like, aggregation, windowing and groupBy in detail.
Advantages of using Fluxtion functional DSL
Business logic components are re-usable and testable outside Fluxtion
Clear separation between event notification and business logic, event logic is removed from business code
Complex library functions like windowing and aggregation are well tested and natively supported
Increased developer productivity, less code to write and support
New functionality is simple and cheap to integrate, Fluxtion pays the cost of rewiring the event flow
No vendor lock-in, business code is free from any Fluxtion library dependencies
Fluxtion offers a DSL to bind functions into the event processor using the familiar map/filter/peek similar to the java
stream api. Bound functions are invoked in accordance to the dispatch rules.
An event processor is a live structure where new events trigger a set of dispatch operations. The node wrapping a function
supports both stateful and stateless functions, it is the user choice what type of function to bind.
DataFlow
To bind a functional operation we first create a DataFlow
in the event processor. A DataFlow triggers when the event processor starts a calculation cycle and there is a matching
dispatch rule. In the imperative approach an event processor entry point is registered by annotating a method
with @OnEventHandler or an interface exported with @ExportService.
The DataFlow class provides builder methods to create and bind flows in an event processor. There is no restriction
on the number of data flows bound inside an event processor.
Subscribe to event
To create a flow for String events, call DataFlow.subscribe(String.class), any call to processor.onEvent(“myString”) will be
routed to this flow.
Once a flow has been created map, filter, groupBy, etc. functions can be applied as chained calls.
Running the example code above logs to console
Map
A map operation takes the output from a parent node and then applies a function to it. If the return of the
function is null then the event notification no longer propagates down that path.
Map supports
Stateless functions
Stateful functions
Primitive specialisation
Method references
Inline lambdas - interpreted mode only support, AOT mode will not serialise the inline lambda
Running the example code above logs to console
BiMap
Two data flows can be mapped with a bi map function. Both flows must have triggered at least once for the bimap function
to be invoked
Running the example code above logs to console
Default value
A default value can be assigned to any flow. This can be useful when calculating a bi map function and one data flow
argument is optional
Running the example code above logs to console
Filter
A filter predicate can be applied to a node to control event propagation, true continues the propagation and false swallows
the notification. If the predicate returns true then the input to the predicate is passed to the next operation in the
event processor.
Filter supports
Stateless functions
Stateful functions
Primitive specialisation
Method references
Inline lambdas - interpreted mode only support, AOT mode will not serialise the inline lambda
Running the example code above logs to console
Reduce
There is no reduce function required in Fluxtion, stateful map functions perform the role of reduce. In a classic batch
environment the reduce operation combines a collection of items into a single value. In a streaming environment
the set of values is never complete, we can view the current value of a stateful map operation which is equivalent to the
reduce operation. The question is rather, when is the value of the stateful map published and reset.
FlatMap
A Flatmap operation flattens a collection in a data flow. Any operations applied after the flatmap operation are
performed on each element in the collection.
Flows can be merged to output a single flow that can be operated on
Running the example code above logs to console
Merge and map flows
Merge multiple streams of different types into a single output, applying a mapping operation to combine the different types.
Only when at least one element from each required flow is received will the data flow publish. The upstream flows are
merged into a user class that is published as the output of the merge flow. The target class is specified with:
Upstream flows are set on the merge target class with a Consumer operation on the target class, T:
[merge and map builder]<T>.required(DataFlow<F> updstreamFlow, BiConsumer<T, F>)
Merge inputs are supported that do not have to trigger to publish the flow downstream. The value in the merge target
could be null if the upstream has not triggered and all the required flows have.
[merge and map builder]<T>.requiredNoTrigger(DataFlow<F> updstreamFlow, BiConsumer<T, F>)
Running the example code above logs to console
Sink
An application can register for output from the EventProcessor by supplying a consumer to addSink and removed with a
call to removeSink. Bound classes can publish to sinks during an event process cycle, any registered sinks will see
the update as soon as the data is published, not at the end of the cycle.
DataFlow nodes are available for lookup from an event processor instance using their name. In this case the lookup
returns a reference to the wrapped value and not the wrapping node. The application can then use the reference to
pull data from the node without requiring an event process cycle to push data to an output.
When building the graph with DSL a call to id makes that element addressable for lookup.
Running the example code above logs to console
Graph of functions
Fluxtion automatically wraps the function in a node, actually a monad, and binds both into the event processor. The wrapping node
handles all the event notifications, invoking the user function when it is triggered. Each wrapping node can be the
head of multiple child flows forming complex graph structures that obey the dispatch rules. This is in contrast to
classic java streams that have a terminal operation and a pipeline structure.
This example creates a simple graph structure, multiple stateful/stateless functions are bound to a single parent DataFlow.
We are using the DataFlow.console operation to print intermediate results to the screen for illustrative purposese.
The console operation is a specialisation of DataFlow.peek.
Running the above with a strings ‘test ME’, ‘and AGAIN’ outputs
Processing graph
Fluxtion DSL only requires the developer to write functions, any wrapping nodes are automatically added to the event processor.
The compiler automatically selects stateful or stateless map functions, binding user instances if a stateful map
function is specified.
MyFunctions class is a normal java class bound into the event processor.
Connecting DataFlow and nodes
An event processor supports bi-directional linking between flows and normal java classes, also known as nodes, in the
event processor.
Connecting DataFlow and nodes is a powerful mechanism for joining functional and imperative programming in a streaming environment
Supported bindings:
Node to data flow. The node is the start of a data flow
Data flow to node. The node has runtime access to pull current value of a data flow
Data flow Push to node. Data is pushed from the data flow to the node
Data flow to event processor. Data flow pushes re-entrant events to parent event processor, triggers new calculation cycle
Node to DataFlow
A Dataflow can be created by subscribing to a node that has been imperatively added to the event processor. When the node
triggers in a calculation cycle the DataFlow will be triggered. Create a DataFlow from a node with:
DataFlow.subscribeToNode(new MyComplexNode())
If the node referred to in the DataFlow.subscribeToNode method call is not in the event processor it will be bound
automatically.
The example below creates an instance of MyComplexNode as the head of a DataFlow. When a String event is received the
DataFlow path is executed. In this case we are aggregating into a list that has the four most recent elements
Running the example code above logs to console
DataFlow to node
A data flow can be consumed by a normal java class within the event processor. The data flow runtime class is
FlowSupplier is a normal java Supplier the current value can be accessed by calling get(). When the data flow triggers
the OnTrigger callback method in the child class will be called.
When building the processor, the FlowSupplier is accessed with:
[DataFlow].flowSupplier()
This example binds a data flow of String’s to a java record that has an onTrigger method annotated with @OnTrigger
Running the example code above logs to console
Push to node
A data flow can push a value to any normal java class
Running the example code above logs to console
Re-entrant events
Events can be added for processing from inside the graph for processing in the next available cycle. Internal events
are added to LIFO queue for processing in the correct order. The EventProcessor instance maintains the LIFO queue, any
new input events are queued if there is processing currently acting. Support for internal event publishing is built
into the streaming api.
Maps an int signal to a String and republishes to the graph
Output
Trigger control
Fluxtion offers a way to override the triggering of a flow node in the event processor. There are four trigger controls
available for client code to customise:
Flow.publishTrigger - Notifies a child node when triggered, adds a notification to the normal publish
Flow.publishTriggerOverride - Notifies a child node when triggered, removes all other publish notifications
Flow.updateTrigger - Overrides when the flow node runs its functional operation
Flow.resetTrigger - If the functional operation is stateful calls the reset function
In the trigger examples we are using the DataFlow.subscribeToSignal and processor.publishSignal to drive the trigger
controls on the flow node.
PublishTrigger
In this example the publishTrigger control enables multiple publish calls for the flow node. Child notifications are in
addition to the normal triggering operation of the flow node. The values in the parent node are unchanged when publishing.
Child DataFlow nodes are notified when publishTrigger fires or the map function executes in a calculation cycle.
Running the example code above logs to console
PublishTriggerOverride
In this example the publishTrigger control overrides the normal triggering operation of the flow node. The child is notified
only when publishTriggerOverride fires, changes due to recalculation are swallowed and not published downstream.
The values in the parent node are unchanged when publishing.
Child DataFlow nodes are notified when publishTriggerOverride fires.
Running the example code above logs to console
UpdateTrigger
In this example the updateTrigger controls when the functional mapping operation of the flow node is invoked. The values
are only aggregated when the update trigger is called. Notifications from the parent node are ignored and do not trigger
a mapping operation.
A map operation only occurs when the update trigger fires.
Running the example code above logs to console
ResetTrigger
In this example the resetTrigger controls when the functional mapping operation of the flow node is reset. The aggregate
operation is stateful so all the values in the list are removed when then reset trigger fires. The reset operation causes
trigger a notification to children of the flow node.
The reset trigger notifies the stateful function to clear its state.
Running the example code above logs to console
Stateful function reset
Stateful functions can be reset by implementing the Stateful interface with a reset
method. Configuring the resetTrigger will automatically route calls to the reset method of the stateful function.
Running the example code above logs to console
Aggregating
Aggregating extends the concept of stateful map functions by adding behaviour when using functions in stateful operations
like windowing and grouping. An aggregate function has these behaviours:
Stateful - defines the reset method
aggregate - aggregate a value and calculate a result
combine/deduct - combine or deduct another instance of this function, used when windowing
deduct supported - can this instance deduct another instance of this function or is loop required to recalculate
DataFlow.aggregate takes a Supplier of AggregateFlowFunction’s not a
single AggregateFlowFunction instance. When managing windowing and groupBy operations the event processor creates instances
of AggregateFlowFunction to partition function state.
Running the example code above logs to console
Custom aggregate function
Users can create aggregate functions that plug into the reset trigger callbacks in a DataFlow. The steps to create a
user aggregate function:
Extend AggregateFlowFunction, the type parameters define the input and output types of the function
Implement the reset, get and aggregate methods
Return null from the aggregate method to indicate no change to the aggregate output
The example below maintains a date range as a String and resets the range when reset trigger is fired. When the date range
is unaltered the aggregate operation returns a null and no notifications are triggered.
Running the example code above logs to console
Windowing
Fluxtion supports windowing operations in a DataFlow to aggregate data. There are four types of windows supported:
Tumbling window with custom start/stop triggers
Tumbling time based windows start/stop triggers fire on a timer
Sliding time based windows bucket size is timer based, calculations fire on a timer
Sliding windows bucket size is based on count calculations fire on a bucket count
Fluxtion does not run threads, it is an event driven data structure. On a calculation cycle the window monitors read
the time of the clock and expire windows if necessary.
To advance time in an event processor send any event regularly, this causes the window expiry calculation to run
Tumbling windows
Imagine tumbling windows as distinct buckets collecting data for a fixed size window. Once a bucket fills up, it’s closed and
published downstream. A new, empty bucket is created to collect the next batch of data. Tumbling windows never overlap,
ensuring all data points are processed exactly once. This is good for capturing complete snapshots of the data at regular intervals.
Sliding windows
Think of sliding window as a constantly moving window on the data stream. The window has a fixed size, but it advances
by a set increment (called the slide). As the window slides forward, new data enters at the front, and old data falls
out the back. Unlike tumbling windows, sliding windows can overlap significantly, with data points contributing to
multiple windows. This is useful for capturing trends and changes happening over time. As each slide occurs downstream
nodes are triggered.
Diagram comparing tumbling and sliding windows
Tumbling time window
Fluxtion supports a tumbling time window for any DataFlow node with this call:
tumblingAggregate(Supplier<AggregateFlowFunction> aggregateFunction, int bucketSizeMillis)
The lifecycle of the AggregateFlowFunction is managed by the event processor, tracking the current time and firing
notifications to child nodes when the timer expires. Reset calls to the stateful function are also handled by the event
processor.
An automatically added FixedRateTrigger monitors the tumbling
window for expiry an event is received. If the window has expired, the following actions occur:
The window aggregate is calculated and cached for inspection
The aggregate function is reset
Downstream nodes are triggered with the cached value
This example publishes a random Integer every 10 milliseconds, the int sum calculates the current sum for the window.
Every 300 milliseconds the cumulative sum for the window just expired is logged to console.
Running the example code above logs to console
Tumbling trigger based window
To create a tumbling cart that is none-time based we use the trigger overrides to control resetting and publishing the
values in the tumbling window:
In this example we have a shopping cart that can have at the most three items. The cart can be cleared with a ClearCart
event. A GoToCheckout event publishes the contents of the cart down stream if the number of items > 0;
Running the example code above logs to console
Sliding time window
Fluxtion supports a sliding time window for any DataFlow node with this call:
slidingAggregate(Supplier<AggregateFlowFunction> aggregateFunction, int bucketSizeMillis, int bucketsPerWindow)
The lifecycle of the AggregateFlowFunction is managed by the event processor, tracking the current time and firing
notifications to child nodes when the timer expires.
An automatically added FixedRateTrigger monitors the sliding
window for expiry an event is received. If the window has expired, the following actions occur:
The aggregate for the current window is calculated and combined with the aggregate for the whole sliding window
The aggregate for the oldest window is deducted from the aggregate for the whole sliding window
The aggregate for the whole sliding window is cached and stored for inspection
Downstream nodes are triggered with the cached value
This example publishes a random Integer every 10 milliseconds, the int sum calculates the current sum for the window.
There are 4 buckets each of 300 milliseconds in size, once every 300 milliseconds the aggregate sum for the past 1.2
seconds is logged to console.
As the effective window size is 1.2 seconds the sliding window values are approximately 4 times larger than the tumbling
window example that resets the sum every 300 milliseconds.
Running the example code above logs to console
GroupBy
Fluxtion dsl offers many groupBy operations that partition based on a key function and then apply and aggregate operation
to the partition.
Elements can be deleted from a groupBy data structure either by key or by value. When deleting bt value a stateful predicate
function is used that can be dynamically updated by the client code. Unlike filtering the groupBy data structure is
mutated and elements are removed.
In this example we are grouping pupils by graduation year, a delete by value predicate function removes students if
there gradutaion year is too old. The predicate is subscribing to live data, so when it updates the elements in the
collection are removed.
Running the example code above logs to console
Dataflow shortcut groupBy methods
The DataFlow class offers a set of shortcut methods for groupBy functions that do not require the
subscription method to be declared as it is called implicitly. Some examples below
All the values are passed to the aggregate function and the single scalar output is published for downstream nodes to
consume.
Running the example code above logs to console
Joining
Fluxtion supports join operations for groupBy data flow nodes.
Inner join
Joins are create with the data flow node of a group by or using the JoinFlowBuilder
JoinFlowBuilder.innerJoin(schools, pupils)
The value type of the joined GroupBy is a Tuple, the first value is the left join and the second value is the right join.
The utility static method in Tuples
Tuples.mapTuple
Is used to map the School, Pupil Tuple into a pretty print String.
Running the example code above logs to console
Left outer join
Joins are create with the data flow node of a group by or using the JoinFlowBuilder
JoinFlowBuilder.leftJoin(schools, pupils)
A default value of an empty collection is assigned to the pupil groupBy so the first school can join against a non-null
value.
Running the example code above logs to console
right outer join
Joins are create with the data flow node of a group by or using the JoinFlowBuilder
JoinFlowBuilder.rightJoin(schools, pupils)
A default value of an empty collection is assigned to the pupil groupBy so the first school can join against a non-null
value.
Running the example code above logs to console
Full outer join
Joins are create with the data flow node of a group by or using the JoinFlowBuilder
JoinFlowBuilder.outerJoin(schools, pupils)
A default value of an empty collection is assigned to the pupil groupBy so the first school can join against a non-null
value.
Running the example code above logs to console
Multi join or Co-group
Multi leg joins are supported with no limitation on the number of joins, The MultiJoinBuilder
is used to construct a multi leg join with a builder style pattern
Legs are joined on a common key class results are sent to target class. Each join is added from a flow and pushed into
the target class by specifying the consumer method on the target instance.
The example joins four groupBy data flows for a person, using the String name as a key. When a matching join is found
individual item are set on MergedData instance. Dependents are an optional requirement for the join, so is not required
to publish a MergedData record to the flow.
The MergedData instance is added to the GroupBy data flow keyed by name. The multi join data flow can be operated on
as any normal flow, in this case we are mapping the value with a
pretty printing function.