5 minute AOT hello world
Use Fluxtion to add two numbers from different event streams and log when the sum > 100. The sum is the addition of the current value from each event stream. The stream of events can be infinitely long, calculations are run whenever a new event is received.
Code is available as a maven project
This example creates an event processor ahead of time using the Fluxtion maven plugin. Once generated the event processor is used as a normal user class. The main method instantiates and initialises the event processor, then fires data events at it. If a breach occurs a warning will be logged to console. All dispatch and change notification is handled by Fluxtion when an event is received. Business logic resides in the user functions/classes.
For an event based interpreted example see Hello fluxtion world
Processing graph
flowchart TB
classDef eventHandler color:#022e1f,fill:#aaa3ff,stroke:#000;
classDef graphNode color:#022e1f,fill:#00cfff,stroke:#000;
classDef exportedService color:#022e1f,fill:#aaa3ff,stroke:#000;
style EventProcessor fill:#e9ebe4,stroke:#333,stroke-width:1px
EventA><b>InputEvent</b>::Event_A]:::eventHandler
EventB><b>InputEvent</b>::Event_B]:::eventHandler
HandlerA[Event_A_Handler\n<b>EventHandler</b>::Event_A]:::graphNode
HandlerB[Event_B_Handler\n<b>EventHandler</b>::Event_A]:::graphNode
DataSumCalculator:::graphNode
BreachNotifier:::graphNode
EventA --> HandlerA
EventB --> HandlerB
subgraph EventProcessor
HandlerA --> DataSumCalculator
HandlerB --> DataSumCalculator
DataSumCalculator --> BreachNotifier
end
Processing logic
The Fluxtion event processor manages all the event call backs, the user code handles the business logic.
- An event handlers is notified when an event of the matching type is received.
- This in turn invokes the DataSumCalculator annotated trigger method which calculates the current sum extracting values from handler_A and handler_B.
- If the sum > 100 the DataSumCalculator returns true which propagates a notification to the BreachNotifier annotated trigger method.
- The BreachNotifier trigger method prints a message to the console.
Dependencies
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<version>1.0.0-SNAPSHOT</version>
<parent>
<artifactId>example.master</artifactId>
<groupId>com.fluxtion.example</groupId>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>imperative-helloworld</artifactId>
<name>imperative :: hello world</name>
<build>
<plugins>
<plugin>
<groupId>com.fluxtion</groupId>
<artifactId>fluxtion-maven-plugin</artifactId>
<version>3.0.14</version>
<executions>
<execution>
<goals>
<goal>scan</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.fluxtion</groupId>
<artifactId>compiler</artifactId>
<version>9.7.4</version>
</dependency>
</dependencies>
</project>
<dependencies>
<dependency>
<groupId>com.fluxtion</groupId>
<artifactId>compiler</artifactId>
<version>9.7.4</version>
</dependency>
</dependencies>
implementation 'com.fluxtion:compiler:9.7.4'
Three steps to using Fluxtion
1 - Mark event handling methods with annotations or via functional programming
2 - Build the event processor using fluxtion compiler utility
3 - Integrate the event processor in the app and feed it events
Step 1 - annotate event handling methods
There are two types of user classes employed at runtime. First, pojo’s with event processing methods that are bound into the generated event processor. Secondly, record classes that defines the event types that are fed into the BreachNotifierProcessor. The event processor routes events to event handler methods on bound instances.
Annotated callback methods
- @OnEventHandler annotation declares the entry point of an execution path, triggered by an external event.
- @OnTrigger annotated methods indicate call back methods to be invoked if a parent propagates a change.
The return boolean flag from a trigger or event handler method indicates if event notification should be propagated.
Event handlers
Name | Event handler | Trigger handler | Description |
---|---|---|---|
Event_A_Handler | yes | no | Handles incoming events of type Event_A |
Event_B_Handler | yes | no | Handles incoming events of type Event_B |
DataSumCalculator | no | yes | References DataHandler nodes and calculates the current sum |
BreachNotifier | no | yes | References the DataSumCalculator and logs a warning if sum > 100 |
The event handler method is called when a matching event type is published to the container, the trigger handler is called when a parent dependency haa been trigger or a parent event handler method has been called.
An entry point for processing events of type Event_A and stores the latest value as a member variable.
Annotate the event handler method with @OnEventHandler
as follows:
public class Event_A_Handler {
private double value;
@OnEventHandler
public boolean data1Update(Event_A data1) {
value = data1.value();
return true;
}
public double getValue() {
return value;
}
}
An entry point for processing events of type Event_B and stores the latest value as a member variable.
Annotate the event handler method with @OnEventHandler
as follows:
public class Event_B_Handler {
private double value;
@OnEventHandler
public boolean data1Update(Event_B data2) {
value = data2.value();
return true;
}
public double getValue() {
return value;
}
}
Calculates the current sum adding the values of Event_A_Handler and Event_B_Handler. Will be triggered when either handler has its updated method invoked. Annotate the trigger method with @OnTrigger as follows:
public class DataSumCalculator {
private final Event_A_Handler event_A_Handler;
private final Event_B_Handler event_B_Handler;
private double sum;
public DataSumCalculator(Event_A_Handler event_A_Handler, Event_B_Handler event_B_Handler) {
this.event_A_Handler = event_A_Handler;
this.event_B_Handler = event_B_Handler;
}
public DataSumCalculator() {
this(new Event_A_Handler(), new Event_B_Handler());
}
@OnTrigger
public boolean calculate() {
sum = event_A_Handler.getValue() + event_B_Handler.getValue();
System.out.println("sum:" + sum);
return sum > 100;
}
public double getSum() {
return sum;
}
}
The return flag indicates that the event notification should be propagated and any child nodes trigger methods should be invoked.
Logs to console when the sum breaches a value, BreachNotifier holds a reference to the DataSumCalculator instance. The trigger method is only invoked if the DataSumCalculator propagates the notification, by returning true from its trigger method. Annotate the trigger method with @OnTrigger as follows:
public class BreachNotifier {
private final DataSumCalculator dataAddition;
public BreachNotifier(DataSumCalculator dataAddition) {
this.dataAddition = dataAddition;
}
public BreachNotifier() {
this(new DataSumCalculator());
}
@OnTrigger
public boolean printWarning() {
System.out.println("WARNING DataSumCalculator value is greater than 100 sum = " + dataAddition.getSum());
return true;
}
}
Events
Java records are used as events.
public record Event_A(double value) {}
public record Event_B(double value) {}
Step 2 - build the event processor
All the pojo classes required for processing are linked together using an imperative style in our AotBuilder. The maven plugin interrogates the builder to generate an event processor that binds in all the user pojos.
Fluxtion generator binds all objects supplied in the buildGraph(EventProcessorConfig eventProcessorConfig)
method. Any connected instance will be automatically discovered and added to the final event processor. Due to discovery
only BreachNotifier needs to be added with eventProcessorConfig.addNode(new BreachNotifier())
to bind the whole user
object graph into the event processor.
The configuration for the generated source file is set in the builder
method configureGeneration(FluxtionCompilerConfig fluxtionCompilerConfig)
The AotBuilder adds user classes imperatively to the EventProcessorConfig in the buildGraph method. Source generation configuration is handled in the configureGeneration method.
public class AotBuilder implements FluxtionGraphBuilder {
@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {
eventProcessorConfig.addNode(new BreachNotifier());
}
@Override
public void configureGeneration(FluxtionCompilerConfig fluxtionCompilerConfig) {
fluxtionCompilerConfig.setClassName("BreachNotifierProcessor");
fluxtionCompilerConfig.setPackageName("com.fluxtion.example.imperative.helloworld.generated");
}
}
The AOT generated event processor source file is here BreachNotifierProcessor.java
/**
*
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.23
* api version : 9.2.23
* </pre>
*
* Event classes supported:
*
* <ul>
* <li>com.fluxtion.example.imperative.helloworld.Event_A
* <li>com.fluxtion.example.imperative.helloworld.Event_B
* <li>com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent
* </ul>
*
* @author Greg Higgins
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class BreachNotifierProcessor
implements EventProcessor<BreachNotifierProcessor>,
StaticEventProcessor,
InternalEventProcessor,
BatchHandler,
Lifecycle {
//Node declarations
private final CallbackDispatcherImpl callbackDispatcher = new CallbackDispatcherImpl();
private final Event_A_Handler event_A_Handler_2 = new Event_A_Handler();
private final Event_B_Handler event_B_Handler_3 = new Event_B_Handler();
private final DataSumCalculator dataSumCalculator_1 =
new DataSumCalculator(event_A_Handler_2, event_B_Handler_3);
private final BreachNotifier breachNotifier_0 = new BreachNotifier(dataSumCalculator_1);
public final NodeNameAuditor nodeNameLookup = new NodeNameAuditor();
private final SubscriptionManagerNode subscriptionManager = new SubscriptionManagerNode();
private final MutableEventProcessorContext context =
new MutableEventProcessorContext(
nodeNameLookup, callbackDispatcher, subscriptionManager, callbackDispatcher);
public final Clock clock = new Clock();
private final ExportFunctionAuditEvent functionAudit = new ExportFunctionAuditEvent();
//Dirty flags
private boolean initCalled = false;
private boolean processing = false;
private boolean buffering = false;
private final IdentityHashMap<Object, BooleanSupplier> dirtyFlagSupplierMap =
new IdentityHashMap<>(3);
private final IdentityHashMap<Object, Consumer<Boolean>> dirtyFlagUpdateMap =
new IdentityHashMap<>(3);
private boolean isDirty_dataSumCalculator_1 = false;
private boolean isDirty_event_A_Handler_2 = false;
private boolean isDirty_event_B_Handler_3 = false;
//Forked declarations
//Filter constants
public BreachNotifierProcessor(Map<Object, Object> contextMap) {
context.replaceMappings(contextMap);
//node auditors
initialiseAuditor(clock);
initialiseAuditor(nodeNameLookup);
subscriptionManager.setSubscribingEventProcessor(this);
context.setEventProcessorCallback(this);
}
public BreachNotifierProcessor() {
this(null);
}
@Override
public void init() {
initCalled = true;
auditEvent(Lifecycle.LifecycleEvent.Init);
//initialise dirty lookup map
isDirty("test");
clock.init();
afterEvent();
}
@Override
public void start() {
if (!initCalled) {
throw new RuntimeException("init() must be called before start()");
}
processing = true;
auditEvent(Lifecycle.LifecycleEvent.Start);
afterEvent();
callbackDispatcher.dispatchQueuedCallbacks();
processing = false;
}
@Override
public void stop() {
if (!initCalled) {
throw new RuntimeException("init() must be called before stop()");
}
processing = true;
auditEvent(Lifecycle.LifecycleEvent.Stop);
afterEvent();
callbackDispatcher.dispatchQueuedCallbacks();
processing = false;
}
@Override
public void tearDown() {
initCalled = false;
auditEvent(Lifecycle.LifecycleEvent.TearDown);
nodeNameLookup.tearDown();
clock.tearDown();
subscriptionManager.tearDown();
afterEvent();
}
@Override
public void setContextParameterMap(Map<Object, Object> newContextMapping) {
context.replaceMappings(newContextMapping);
}
@Override
public void addContextParameter(Object key, Object value) {
context.addMapping(key, value);
}
//EVENT DISPATCH - START
@Override
public void onEvent(Object event) {
if (buffering) {
triggerCalculation();
}
if (processing) {
callbackDispatcher.processReentrantEvent(event);
} else {
processing = true;
onEventInternal(event);
callbackDispatcher.dispatchQueuedCallbacks();
processing = false;
}
}
@Override
public void onEventInternal(Object event) {
if (event instanceof com.fluxtion.example.imperative.helloworld.Event_A) {
Event_A typedEvent = (Event_A) event;
handleEvent(typedEvent);
} else if (event instanceof com.fluxtion.example.imperative.helloworld.Event_B) {
Event_B typedEvent = (Event_B) event;
handleEvent(typedEvent);
} else if (event instanceof com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent) {
ClockStrategyEvent typedEvent = (ClockStrategyEvent) event;
handleEvent(typedEvent);
}
}
public void handleEvent(Event_A typedEvent) {
auditEvent(typedEvent);
//Default, no filter methods
isDirty_event_A_Handler_2 = event_A_Handler_2.data1Update(typedEvent);
if (guardCheck_dataSumCalculator_1()) {
isDirty_dataSumCalculator_1 = dataSumCalculator_1.calculate();
}
if (guardCheck_breachNotifier_0()) {
breachNotifier_0.printWarning();
}
afterEvent();
}
public void handleEvent(Event_B typedEvent) {
auditEvent(typedEvent);
//Default, no filter methods
isDirty_event_B_Handler_3 = event_B_Handler_3.data1Update(typedEvent);
if (guardCheck_dataSumCalculator_1()) {
isDirty_dataSumCalculator_1 = dataSumCalculator_1.calculate();
}
if (guardCheck_breachNotifier_0()) {
breachNotifier_0.printWarning();
}
afterEvent();
}
public void handleEvent(ClockStrategyEvent typedEvent) {
auditEvent(typedEvent);
//Default, no filter methods
clock.setClockStrategy(typedEvent);
afterEvent();
}
//EVENT DISPATCH - END
public void bufferEvent(Object event) {
buffering = true;
if (event instanceof com.fluxtion.example.imperative.helloworld.Event_A) {
Event_A typedEvent = (Event_A) event;
auditEvent(typedEvent);
isDirty_event_A_Handler_2 = event_A_Handler_2.data1Update(typedEvent);
} else if (event instanceof com.fluxtion.example.imperative.helloworld.Event_B) {
Event_B typedEvent = (Event_B) event;
auditEvent(typedEvent);
isDirty_event_B_Handler_3 = event_B_Handler_3.data1Update(typedEvent);
} else if (event instanceof com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent) {
ClockStrategyEvent typedEvent = (ClockStrategyEvent) event;
auditEvent(typedEvent);
clock.setClockStrategy(typedEvent);
}
}
public void triggerCalculation() {
buffering = false;
String typedEvent = "No event information - buffered dispatch";
if (guardCheck_dataSumCalculator_1()) {
isDirty_dataSumCalculator_1 = dataSumCalculator_1.calculate();
}
if (guardCheck_breachNotifier_0()) {
breachNotifier_0.printWarning();
}
afterEvent();
}
private void auditEvent(Object typedEvent) {
clock.eventReceived(typedEvent);
nodeNameLookup.eventReceived(typedEvent);
}
private void auditEvent(Event typedEvent) {
clock.eventReceived(typedEvent);
nodeNameLookup.eventReceived(typedEvent);
}
private void initialiseAuditor(Auditor auditor) {
auditor.init();
auditor.nodeRegistered(breachNotifier_0, "breachNotifier_0");
auditor.nodeRegistered(dataSumCalculator_1, "dataSumCalculator_1");
auditor.nodeRegistered(event_A_Handler_2, "event_A_Handler_2");
auditor.nodeRegistered(event_B_Handler_3, "event_B_Handler_3");
auditor.nodeRegistered(callbackDispatcher, "callbackDispatcher");
auditor.nodeRegistered(subscriptionManager, "subscriptionManager");
auditor.nodeRegistered(context, "context");
}
private void beforeServiceCall(String functionDescription) {
functionAudit.setFunctionDescription(functionDescription);
auditEvent(functionAudit);
if (buffering) {
triggerCalculation();
}
processing = true;
}
private void afterServiceCall() {
afterEvent();
callbackDispatcher.dispatchQueuedCallbacks();
processing = false;
}
private void afterEvent() {
clock.processingComplete();
nodeNameLookup.processingComplete();
isDirty_dataSumCalculator_1 = false;
isDirty_event_A_Handler_2 = false;
isDirty_event_B_Handler_3 = false;
}
@Override
public void batchPause() {
auditEvent(Lifecycle.LifecycleEvent.BatchPause);
processing = true;
afterEvent();
callbackDispatcher.dispatchQueuedCallbacks();
processing = false;
}
@Override
public void batchEnd() {
auditEvent(Lifecycle.LifecycleEvent.BatchEnd);
processing = true;
afterEvent();
callbackDispatcher.dispatchQueuedCallbacks();
processing = false;
}
@Override
public boolean isDirty(Object node) {
return dirtySupplier(node).getAsBoolean();
}
@Override
public BooleanSupplier dirtySupplier(Object node) {
if (dirtyFlagSupplierMap.isEmpty()) {
dirtyFlagSupplierMap.put(dataSumCalculator_1, () -> isDirty_dataSumCalculator_1);
dirtyFlagSupplierMap.put(event_A_Handler_2, () -> isDirty_event_A_Handler_2);
dirtyFlagSupplierMap.put(event_B_Handler_3, () -> isDirty_event_B_Handler_3);
}
return dirtyFlagSupplierMap.getOrDefault(node, StaticEventProcessor.ALWAYS_FALSE);
}
@Override
public void setDirty(Object node, boolean dirtyFlag) {
if (dirtyFlagUpdateMap.isEmpty()) {
dirtyFlagUpdateMap.put(dataSumCalculator_1, (b) -> isDirty_dataSumCalculator_1 = b);
dirtyFlagUpdateMap.put(event_A_Handler_2, (b) -> isDirty_event_A_Handler_2 = b);
dirtyFlagUpdateMap.put(event_B_Handler_3, (b) -> isDirty_event_B_Handler_3 = b);
}
dirtyFlagUpdateMap.get(node).accept(dirtyFlag);
}
private boolean guardCheck_breachNotifier_0() {
return isDirty_dataSumCalculator_1;
}
private boolean guardCheck_dataSumCalculator_1() {
return isDirty_event_A_Handler_2 | isDirty_event_B_Handler_3;
}
@Override
public <T> T getNodeById(String id) throws NoSuchFieldException {
return nodeNameLookup.getInstanceById(id);
}
@Override
public <A extends Auditor> A getAuditorById(String id)
throws NoSuchFieldException, IllegalAccessException {
return (A) this.getClass().getField(id).get(this);
}
@Override
public void addEventFeed(EventFeed eventProcessorFeed) {
subscriptionManager.addEventProcessorFeed(eventProcessorFeed);
}
@Override
public void removeEventFeed(EventFeed eventProcessorFeed) {
subscriptionManager.removeEventProcessorFeed(eventProcessorFeed);
}
@Override
public BreachNotifierProcessor newInstance() {
return new BreachNotifierProcessor();
}
@Override
public BreachNotifierProcessor newInstance(Map<Object, Object> contextMap) {
return new BreachNotifierProcessor();
}
@Override
public String getLastAuditLogRecord() {
try {
EventLogManager eventLogManager =
(EventLogManager) this.getClass().getField(EventLogManager.NODE_NAME).get(this);
return eventLogManager.lastRecordAsString();
} catch (Throwable e) {
return "";
}
}
}
Step 3 - Integrate event processor and connect event stream
The example Main method instantiates the BreachNotifierProcessor, initialises it and submits events for processing using the onEvent method. The init method must be called before submitting events.
Events are submitted for processing by calling eventProcessor.onEvent()
with instances of Event_A or Event_B.
The code for instantiating, initializing and sending events:
public class Main {
public static void main(String[] args) {
var eventProcessor = new BreachNotifierProcessor();
eventProcessor.init();
eventProcessor.onEvent(new Event_A(34.4));
eventProcessor.onEvent(new Event_B(52.1));
eventProcessor.onEvent(new Event_A(105));//should create a breach warning
eventProcessor.onEvent(new Event_A(12.4));
}
}
Example execution output
sum:34.4
sum:86.5
sum:157.1
WARNING DataSumCalculator value is greater than 100 sum = 157.1
sum:64.5