5 minute Functional hello world

Hello world using a functional programming style with Fluxtion DSL. Business logic resides in user functions removing the need to write classes and annotate event handling methods with fluxtion annotations.

Code is available as a maven project

Add two numbers from different event streams and log when the sum > 100. The sum is the addition of the current value from each event stream. The stream of events can be infinitely long, calculations are run whenever a new event is received.

This example creates an event processor, initialises it and fires data events at the processor. If a breach occurs a warning will be logged to console. All dispatch and change notification is handled by Fluxtion when an event is received.

For an imperative implementation example see Hello fluxtion world

Processing graph

The functional approach has more nodes in the event processor compared to the imperative version, but the actual code written by the developer to create the algorithm is much shorter. Fluxtion DSL only requires the developer to write functions, any wrapping nodes are automatically added to the event processor.

flowchart TB

    classDef eventHandler color:#022e1f,fill:#aaa3ff,stroke:#000;
    classDef graphNode color:#022e1f,fill:#00cfff,stroke:#000;
    classDef exportedService color:#022e1f,fill:#aaa3ff,stroke:#000;
    style EventProcessor fill:#e9ebe4,stroke:#333,stroke-width:1px
    
    EventA><b>InputEvent</b>::Event_A]:::eventHandler 
    EventB><b>InputEvent</b>::Event_B]:::eventHandler 
    
    HandlerA[<b>Subscriber</b>::Event_A]:::graphNode 
    HandlerB[<b>Subscriber</b>::Event_A]:::graphNode 
    
    MapData1[<b>Map</b> -> mapToDouble]:::graphNode 
    MapData2[<b>Map</b> -> mapToDouble]:::graphNode 
    
    MapDefaultData1[<b>Map</b> -> defaultValue]:::graphNode 
    MapDefaultData2[<b>Map</b> -> defaultValue]:::graphNode 
    
    BiMapSum[<b>BiMap</b> -> Double::sum]:::graphNode 
    
    Console1[<b>Peek</b> -> console]:::graphNode 
    Filter[<b>Filter</b> -> d > 100]:::graphNode 
    Console2[<b>Peek</b> -> console]:::graphNode 
    
    EventA --> HandlerA
    EventB --> HandlerB
    
    subgraph EventProcessor
      HandlerA --> MapData1 --> MapDefaultData1 --> BiMapSum
      HandlerB --> MapData2 --> MapDefaultData2 --> BiMapSum
      BiMapSum --> Console1 --> Filter --> Console2
    end
    

Processing logic

The Fluxtion event processor manages all the event call backs, the user logic is a set of functions that are bound into the event processor using the Fluxtion DSL.

  • An event handlers is notified when an event of the matching type is received triggering the next item in the chain
  • The event is mapped to a double using the Data1::value or Data2::value function
  • A default double value of 0 is assigned to the output
  • The two event streams are merged and passed to the bi map function. Double::sum is invoked when either input stream triggers
  • A peek function logs the sum to the console
  • A filter function is bound to the graph, if the sum > 100 the filter test passes and the next node is triggered
  • A peek function logs the warning message to the console

Dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>example.master</artifactId>
        <groupId>com.fluxtion.example</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <artifactId>functional-helloworld</artifactId>
    <name>functional :: hello world</name>

    <dependencies>
        <dependency>
            <groupId>com.fluxtion</groupId>
            <artifactId>compiler</artifactId>
            <version>9.2.23</version>
        </dependency>
    </dependencies>
</project>
    <dependencies>
        <dependency>
            <groupId>com.fluxtion</groupId>
            <artifactId>compiler</artifactId>
            <version>9.2.23</version>
        </dependency>
    </dependencies>
implementation 'com.fluxtion:compiler:9.2.23'

Three steps to using Fluxtion

1 - Mark event handling methods with annotations or via functional programming
2 - Build the event processor using fluxtion compiler utility
3 - Integrate the event processor in the app and feed it events

Step 1 - bind functions to events using Fluxtion DSL

The Fluxtion DSL is used to construct the algorithm chaining together functions that are triggered by an incoming event. The algorithm is a graph not a simple pipeline that merges at the bi map function.

By default, a bi map function is only invoked when both parents have triggered at least once. Supplying a default value removes the trigger check for the input to the bi map function.

private static void bindFunctions(EventProcessorConfig cfg) {
    var data1Stream = DataFlow.subscribe(Data1.class)
            .mapToDouble(Data1::value)
            .defaultValue(0);

    DataFlow.subscribe(Data2.class)
            .mapToDouble(Data2::value)
            .defaultValue(0)
            .mapBiFunction(Double::sum, data1Stream)
            .console("sum:{}")
            .filter(d -> d > 100)
            .console("WARNING DataSumCalculator value is greater than 100 sum = {}");
}

Events

Java records are used as events.

public record Event_A(double value) {}
public record Event_B(double value) {}

Step 2 - build the event processor

The functional DSL is used within the context of the Fluxtion.interpreted method to build the event processor. The DSL processor binds all the user functions and required wrapping nodes into the event processor.

var eventProcessor = Fluxtion.interpret(Main::bindFunctions);

Step 3 - Integrate event processor and connect event stream

The example Main method instantiates an event processor in interpreted mode, initialises it and submits events for processing using the onEvent method. The init method must be called before submitting events.

Events are submitted for processing by calling eventProcessor.onEvent() with instances of Event_A or Event_B.

The code for instantiating, initializing and sending events is:

public class Main {
    public static void main(String[] args) {
        //build the EventProcessor and initialise it
        var eventProcessor = Fluxtion.interpret(Main::bindFunctions);
        eventProcessor.init();

        //send events
        eventProcessor.onEvent(new Data1(34));
        eventProcessor.onEvent(new Data2(52.1));
        eventProcessor.onEvent(new Data1(105));//should create a breach warning
        eventProcessor.onEvent(new Data1(12.4));
    }

    private static void bindFunctions(EventProcessorConfig cfg) {
        var data1Stream = DataFlow.subscribe(Data1.class)
                .mapToDouble(Data1::value)
                .defaultValue(0);
    
        DataFlow.subscribe(Data2.class)
                .mapToDouble(Data2::value)
                .defaultValue(0)
                .mapBiFunction(Double::sum, data1Stream)
                .console("sum:{}")
                .filter(d -> d > 100)
                .console("WARNING DataSumCalculator value is greater than 100 sum = {}");
    }
}

Example execution output

sum:34.0
sum:86.1
sum:157.1
WARNING DataSumCalculator value is greater than 100 sum = 157.1
sum:64.5