5 minute hello world
Use Fluxtion to add two numbers from different event streams and log when the sum > 100. The sum is the addition of the current value from each event stream. The stream of events can be infinitely long, calculations are run whenever a new event is received.
Code is available as a maven project
This example creates an event processor, initialises it and fires data events at the processor. If a breach occurs a warning will be logged to console. All dispatch and change notification is handled by Fluxtion when an event is received. Business logic resides in the user functions/classes.
For a functional implementation example see Hello functional fluxtion world
Processing graph
flowchart TB
classDef eventHandler color:#022e1f,fill:#aaa3ff,stroke:#000;
classDef graphNode color:#022e1f,fill:#00cfff,stroke:#000;
classDef exportedService color:#022e1f,fill:#aaa3ff,stroke:#000;
style EventProcessor fill:#e9ebe4,stroke:#333,stroke-width:1px
EventA><b>InputEvent</b>::Event_A]:::eventHandler
EventB><b>InputEvent</b>::Event_B]:::eventHandler
HandlerA[Event_A_Handler\n<b>EventHandler</b>::Event_A]:::graphNode
HandlerB[Event_B_Handler\n<b>EventHandler</b>::Event_A]:::graphNode
DataSumCalculator:::graphNode
BreachNotifier:::graphNode
EventA --> HandlerA
EventB --> HandlerB
subgraph EventProcessor
HandlerA --> DataSumCalculator
HandlerB --> DataSumCalculator
DataSumCalculator --> BreachNotifier
end
Processing logic
The Fluxtion event processor manages all the event call backs, the user code handles the business logic.
- An event handlers is notified when an event of the matching type is received.
- This in turn invokes the DataSumCalculator annotated trigger method which calculates the current sum extracting values from handler_A and handler_B.
- If the sum > 100 the DataSumCalculator returns true which propagates a notification to the BreachNotifier annotated trigger method.
- The BreachNotifier trigger method prints a message to the console.
Dependencies
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>example.master</artifactId>
<groupId>com.fluxtion.example</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>imperative-helloworld</artifactId>
<name>imperative :: hello world</name>
<dependencies>
<dependency>
<groupId>com.fluxtion</groupId>
<artifactId>compiler</artifactId>
<version>9.7.4</version>
</dependency>
</dependencies>
</project>
<dependencies>
<dependency>
<groupId>com.fluxtion</groupId>
<artifactId>compiler</artifactId>
<version>9.7.4</version>
</dependency>
</dependencies>
implementation 'com.fluxtion:compiler:9.7.4'
Three steps to using Fluxtion
1 - Mark event handling methods with annotations or via functional programming
2 - Build the event processor using fluxtion compiler utility
3 - Integrate the event processor in the app and feed it events
Step 1 - annotate event handling methods
There are two types of user classes employed at runtime. First, pojo’s with event processing methods that are bound into the generated event processor. Secondly, record classes that defines the event types that are fed into the BreachNotifierProcessor. The event processor routes events to event handler methods on bound instances.
Annotated callback methods
- @OnEventHandler annotation declares the entry point of an execution path, triggered by an external event.
- @OnTrigger annotated methods indicate call back methods to be invoked if a parent propagates a change.
The return boolean flag from a trigger or event handler method indicates if event notification should be propagated.
Event handlers
Name | Event handler | Trigger handler | Description |
---|---|---|---|
Event_A_Handler | yes | no | Handles incoming events of type Event_A |
Event_B_Handler | yes | no | Handles incoming events of type Event_B |
DataSumCalculator | no | yes | References DataHandler nodes and calculates the current sum |
BreachNotifier | no | yes | References the DataSumCalculator and logs a warning if sum > 100 |
The event handler method is called when a matching event type is published to the container, the trigger handler is called when a parent dependency haa been trigger or a parent event handler method has been called.
All classes in one source file, use the copy button and run the single file in your ide. Make sure to add the Fluxtion compiler dependency to your project
public class MainWithNestedClasses {
public static void main(String[] args) {
var eventProcessor = Fluxtion.interpret(new BreachNotifier());
eventProcessor.init();
eventProcessor.onEvent(new Event_A(34.4));
eventProcessor.onEvent(new Event_B(52.1));
eventProcessor.onEvent(new Event_A(105));//should create a breach warning
eventProcessor.onEvent(new Event_A(12.4));
}
public record Event_A(double value) {
}
public record Event_B(double value) {
}
/**
* An entry point to {@link com.fluxtion.runtime.EventProcessor} for {@link Event_A} events. The {@link OnEventHandler}
* defines a method as an entry point.
*
*/
@Getter
public static class Event_A_Handler {
private double value;
/**
* The {@link OnEventHandler} annotation marks this method as the start of an execution path with a {@link Event_A}.
* Invoked when the {@link com.fluxtion.runtime.EventProcessor} receives a {@link Event_A} event.
*
* @param eventA the input event
* @return flag indicating a change and a propagation of the event wave to child dependencies
*/
@OnEventHandler
public boolean data1Update(Event_A eventA) {
value = eventA.value();
return true;
}
}
/**
* An entry point to {@link com.fluxtion.runtime.EventProcessor} for {@link Event_B} events. The {@link OnEventHandler}
* defines a method as an entry point.
*
*/
@Getter
public static class Event_B_Handler {
private double value;
/**
* The {@link OnEventHandler} annotation marks this method as the start of an execution path with a {@link Event_B}.
* Invoked when the {@link com.fluxtion.runtime.EventProcessor} receives a {@link Event_B} event.
*
* @param eventB the input event
* @return flag indicating a change and a propagation of the event wave to child dependencies
*/
@OnEventHandler
public boolean data1Update(Event_B eventB) {
value = eventB.value();
return true;
}
}
/**
* Aggregates two event sources and calculates the sum of their values, whenever either changes. The calculate method
* notifies a propagation of a change when the sum is > 100
*/
public static class DataSumCalculator {
private final Event_A_Handler eventAHandler;
private final Event_B_Handler eventBHandler;
@Getter
private double sum;
public DataSumCalculator(Event_A_Handler eventAHandler, Event_B_Handler eventBHandler) {
this.eventAHandler = eventAHandler;
this.eventBHandler = eventBHandler;
}
public DataSumCalculator() {
this(new Event_A_Handler(), new Event_B_Handler());
}
/**
* The {@link OnTrigger} annotation marks this method to be called if any parents have changed
*
* @return flag indicating a change and a propagation of the event wave to child dependencies if the sum > 100
*/
@OnTrigger
public boolean calculate() {
sum = eventAHandler.getValue() + eventBHandler.getValue();
System.out.println("sum:" + sum);
return sum > 100;
}
}
/**
* The trigger method, printWarning on this class is invoked when a change is propagated from the parent node
*/
public static class BreachNotifier {
private final DataSumCalculator dataAddition;
public BreachNotifier(DataSumCalculator dataAddition) {
this.dataAddition = dataAddition;
}
public BreachNotifier() {
this(new DataSumCalculator());
}
@OnTrigger
public boolean printWarning() {
System.out.println("WARNING DataSumCalculator value is greater than 100 sum = " + dataAddition.getSum());
return true;
}
}
}
An entry point for processing events of type Event_A and stores the latest value as a member variable.
Annotate the event handler method with @OnEventHandler
as follows:
public class Event_A_Handler {
private double value;
@OnEventHandler
public boolean data1Update(Event_A data1) {
value = data1.value();
return true;
}
public double getValue() {
return value;
}
}
An entry point for processing events of type Event_B and stores the latest value as a member variable.
Annotate the event handler method with @OnEventHandler
as follows:
public class Event_B_Handler {
private double value;
@OnEventHandler
public boolean data1Update(Event_B data2) {
value = data2.value();
return true;
}
public double getValue() {
return value;
}
}
Calculates the current sum adding the values of Event_A_Handler and Event_B_Handler. Will be triggered when either handler has its updated method invoked. Annotate the trigger method with @OnTrigger as follows:
public class DataSumCalculator {
private final Event_A_Handler event_A_Handler;
private final Event_B_Handler event_B_Handler;
private double sum;
public DataSumCalculator(Event_A_Handler event_A_Handler, Event_B_Handler event_B_Handler) {
this.event_A_Handler = event_A_Handler;
this.event_B_Handler = event_B_Handler;
}
public DataSumCalculator() {
this(new Event_A_Handler(), new Event_B_Handler());
}
/**
* The {@link OnTrigger} annotation marks this method to be called if any parents have changed
*
* @return flag indicating a change and a propagation of the event wave to child dependencies if the sum > 100
*/
@OnTrigger
public boolean calculate() {
sum = event_A_Handler.getValue() + event_B_Handler.getValue();
System.out.println("sum:" + sum);
return sum > 100;
}
public double getSum() {
return sum;
}
}
The return flag indicates that the event notification should be propagated and any child nodes trigger methods should be invoked.
Logs to console when the sum breaches a value, BreachNotifier holds a reference to the DataSumCalculator instance. The trigger method is only invoked if the DataSumCalculator propagates the notification, by returning true from its trigger method. Annotate the trigger method with @OnTrigger as follows:
public class BreachNotifier {
private final DataSumCalculator dataAddition;
public BreachNotifier(DataSumCalculator dataAddition) {
this.dataAddition = dataAddition;
}
public BreachNotifier() {
this(new DataSumCalculator());
}
@OnTrigger
public boolean printWarning() {
System.out.println("WARNING DataSumCalculator value is greater than 100 sum = " + dataAddition.getSum());
return true;
}
}
Events
Java records are used as events.
public record Event_A(double value) {}
public record Event_B(double value) {}
Step 2 - build the event processor
All the pojo classes required for processing are linked together using an imperative style in our main method and supplied
to the Fluxtion.interpreted
method to build the event processor. The Fluxtion interpreter interrogates the supplied instances
and binds all the user pojos into the event processor.
Any connected instance will be automatically discovered and added to the final event processor binding the whole user object graph into the event processor.
var eventProcessor = Fluxtion.interpret(new BreachNotifier());
Step 3 - Integrate event processor and connect event stream
The example Main method instantiates an event processor in interpreted mode, initialises it and submits events for processing using the onEvent method. The init method must be called before submitting events.
Events are submitted for processing by calling eventProcessor.onEvent()
with instances of Event_A or Event_B.
The code for instantiating, initializing and sending events:
public class Main {
public static void main(String[] args) {
var eventProcessor = Fluxtion.interpret(new BreachNotifier());
eventProcessor.init();
eventProcessor.onEvent(new Event_A(34.4));
eventProcessor.onEvent(new Event_B(52.1));
eventProcessor.onEvent(new Event_A(105));//should create a breach warning
eventProcessor.onEvent(new Event_A(12.4));
}
}
Example execution output
sum:34.4
sum:86.5
sum:157.1
WARNING DataSumCalculator value is greater than 100 sum = 157.1
sum:64.5