Application integration of an event processor
Once the event processor has been generated it can be used by the application. An instance of an EventProcessor is the bridge between event streams and processing logic, application integration connects the event processor to the application event sources.
An application can contain multiple event processor instances, they are lightweight objects designed to be embedded in your application. The application creates instances of event processors and routes events to an instance.
The source project for the examples can be found here
EventProcessors are not thread safe a single event should be processed at one time. Application code is responsible for synchronizing thread access to an event processor instance.
User code interacts with an event processor instance in one of five ways
- Creating a new processor
- Input to a processor
- Output from a processor
- Control of a processor
- Query into a processor
Table of contents
Example project
The source project for the examples can be found here
Creating a new processor
Fluxtion provides several strategies for creating an event processor instance that you will use in your application:
- Generate in process
- Use an AOT processor as POJO
- Factory method on an event processor instance
- Instance per partitioned event stream
Once you have created an instance you can use it as any normal java class. For information about generating an event processor see build event processor section for further details
Create an instance
Creating a new processor in process by calling one of the Fluxtion methods Fluxtion.interpret
or Fluxtion.compile
,
or by using an AOT generated processor. An AOT processor is a normal java class that has a constructor, use it as a POJO
in your application.
Code sample
public class VanillaCreateEventProcessor {
public static void main(String[] args) {
//IN PROCESS CREATION
var processor = Fluxtion.interpret(new VanillaAotBuilder()::buildGraph);
processor.init();
processor.onEvent("hello world - in process");
//AOT USE AS POJO
processor = new VanillaProcessor();
processor.init();
processor.onEvent("hello world - AOT");
}
//THE BUILDER IS CALLED AS PART OF THE MAVEN BUILD TO GENERATE AOT PROCESSOR
public static class VanillaAotBuilder implements FluxtionGraphBuilder{
@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {
DataFlow.subscribe(String.class)
.console("received -> {}");
}
@Override
public void configureGeneration(FluxtionCompilerConfig compilerConfig) {
compilerConfig.setClassName("VanillaProcessor");
compilerConfig.setPackageName("com.fluxtion.example.reference.integration.genoutput");
}
}
}
Sample log
received -> hello world - in process
received -> hello world - AOT
Factory method
An event processor instance provides a factory method that creates new instances of the same event processor class. This can be useful when multiple instances of the same processor are required but each with different state. Applications sometimes partition an event streams, each instance of the event processor can be bound to that partitioned stream.
Code sample
public class FactoryExample {
public static void main(String[] args) {
var processor = Fluxtion.compile(new MyPartitionedLogic());
var processor_A = processor.newInstance();
var processor_B = processor.newInstance();
//Factory method to create new event processor instances
processor_A.init();
processor_B.init();
//user partitioning event flow logic
processor_A.onEvent("for A");
processor_B.onEvent("for B");
}
public static class MyPartitionedLogic {
@OnEventHandler
public boolean onString(String signal) {
System.out.println(signal);
return true;
}
}
}
Sample log
for A
for B
Compiled event processor only support
The compiled event processors create bound nodes using their constructors so each compiled event processor has no shared references. As an interpreted instance refers to the nodes it has been generated with, it does create new instance of bound nodes, it cannot create a segregated instance of an event processor.
Factory method processor.newInstance() is only supported for compiled event processors
Partitioning
Fluxtion provides an automatic partitioning function that can be used to query event flow, partition it and assign a new event processor instance to that flow. The partitioner takes a factory method to create new event processor instances and a key function to segregate event flow.
The partitioner will create and initialise a new event processor when the key function returns a previously unseen key.
Code sample
public class AutomaticPartitionExample {
public record DayEvent(String day, int amount) {}
public static void main(String[] args) {
var processorFactory = Fluxtion.compile(new DayEventProcessor());
//Create a partitioner that will partition data based on a property
Partitioner<StaticEventProcessor> partitioner = new Partitioner<>(processorFactory::newInstance);
partitioner.partition(DayEvent::day);
//new processor for Monday
partitioner.onEvent(new DayEvent("Monday", 2));
partitioner.onEvent(new DayEvent("Monday", 4));
//new processor for Tuesday
partitioner.onEvent(new DayEvent("Tuesday", 14));
//new processor for Friday
partitioner.onEvent(new DayEvent("Friday", 33));
//re-use processor for Monday
System.out.println();
partitioner.onEvent(new DayEvent("Monday", 999));
}
public static class DayEventProcessor {
@Initialise
public void initialise() {
System.out.println("\nDayEventProcessor::initialise");
}
@OnEventHandler
public boolean onDayaEvent(DayEvent signal) {
System.out.println(signal);
return true;
}
}
}
Sample log
DayEventProcessor::initialise
DayEvent[day=Monday, amount=2]
DayEvent[day=Monday, amount=4]
DayEventProcessor::initialise
DayEvent[day=Tuesday, amount=14]
DayEventProcessor::initialise
DayEvent[day=Friday, amount=33]
DayEvent[day=Monday, amount=999]
Inputs to a processor
An event processor responds to input by triggering a calculation cycle, triggering a calculation cycle from an input is covered in this section. Functions are bound to the calculation cycle to meet the business requirements, see the mark event handling section for further details. To process inputs the event processor instance must be initialised before any input is submitted.
1 - Call EventProcessor.init() before submitting any input
2 - Each new input triggers a graph calculation cycle
Events
A simple example demonstrating how an application triggers an event process cycle by calling
processor.onEvent("WORLD");
Code sample
public class EventInput {
public static void main(String[] args) {
EventProcessor<?> processor = Fluxtion.interpret(c -> DataFlow.subscribe(String.class).console("Hello {}"));
//lifecycle init required
processor.init();
//send event
processor.onEvent("WORLD");
}
}
Sample log
Hello WORLD
Exported service
The generated event processor implements any exported service interface, calling a service method will trigger an event process cycle. The service reference is discovered in the application with a call to the event processor instance,
ServiceController svc = processor.getExportedService(ServiceController.class);
An event processor provides several service discovery methods:
- Lookup using type inference -
ServiceController svc = processor.getExportedService()
- Lookup using a specific type -
var svc = processor.getExportedService(ServiceController.class)
- Consuming a service if it is exported -
processor.consumeServiceIfExported(ServiceController.class, s -> {})
- Checking if a service is exported -
processor.exportsService(MiaServiceController.class))
Code sample
public class ServiceInput {
public static void main(String[] args) {
EventProcessor<?> processor = Fluxtion.interpret(new MyConsumer());
//lifecycle init required
processor.init();
//lookup service using type inference
ServiceController svc = processor.getExportedService();
svc.serviceOn(System.out::println, "WORLD");
//SUPPORTED service lookups
//lookup with explicit type
svc = processor.getExportedService(ServiceController.class);
svc.serviceOn(System.out::println, "WORLD");
//lookup with explicit type, use default value if none exported
MiaServiceController svcMissing = processor.getExportedService(MiaServiceController.class, (consumer, message) -> {
System.out.println("MiaServiceController not exported");
return false;
});
svcMissing.serviceOn(System.out::println, "WORLD");
//lookup and consume service if exported
processor.consumeServiceIfExported(ServiceController.class, s -> s.serviceOn(System.out::println, "WORLD"));
//is service exported
System.out.println("ServiceController.class exported : " + processor.exportsService(ServiceController.class));
System.out.println("MiaServiceController.class exported: " + processor.exportsService(MiaServiceController.class));
}
public interface ServiceController {
boolean serviceOn(Consumer<String> consumer, String message);
}
public interface MiaServiceController {
boolean serviceOn(Consumer<String> consumer, String message);
}
public static class MyConsumer implements @ExportService ServiceController {
@Override
public boolean serviceOn(Consumer<String> consumer, String message) {
consumer.accept("hello " + message);
return false;
}
}
}
Sample log
hello WORLD
hello WORLD
MiaServiceController not exported
hello WORLD
ServiceController.class exported : true
MiaServiceController.class exported: false
Signals
Fluxtion provides some prebuilt signal classes that are re-usable and reduce the application code to write. The Signal classes can be sent as events triggering an event process cycle using the Api calls built into the event processor:
- publish a signal with a filter and value
- publish a signal with only a filter
- publish a signal with only a value
- publish primitive signals with a filter
Code sample
public class SignalInput {
public static void main(String[] args) {
var processor = Fluxtion.interpret(new SignalInput());
processor.init();
processor.publishIntSignal("1", 200);
processor.publishSignal("ALERT_SIGNAL", "alert!!");
processor.publishSignal("WAKEUP");
processor.publishObjectSignal("WAKEUP");
processor.publishObjectSignal(new Date());
}
@OnEventHandler(filterString = "1")
public boolean intSignal(Signal.IntSignal value) {
System.out.println("intSignal [" + value.getValue() + "]");
return true;
}
@OnEventHandler(filterString = "ALERT_SIGNAL")
public boolean alertSignal(Signal<String> signalToProcess) {
System.out.println("alertStringSignal [" + signalToProcess + "]");
return true;
}
@OnEventHandler(filterStringFromClass = String.class)
public boolean anyStringSignal(Signal<String> signalToProcess) {
System.out.println("anyStringSignal [" + signalToProcess + "]");
return true;
}
@OnEventHandler(filterStringFromClass = Date.class)
public boolean anyDateSignal(Signal<Date> signalToProcess) {
System.out.println("anyDateSignal [" + signalToProcess + "]");
return true;
}
@OnEventHandler(filterString = "WAKEUP")
public boolean namedSignal(Signal<?> signalToProcess) {
System.out.println("namedSignal [" + signalToProcess.filterString() + "]");
return true;
}
}
Sample log
intSignal [200]
alertStringSignal [Signal: {filterString: ALERT_SIGNAL, value: alert!!}]
namedSignal [WAKEUP]
anyStringSignal [Signal: {filterString: java.lang.String, value: WAKEUP}]
anyDateSignal [Signal: {filterString: java.util.Date, value: Sat Apr 27 06:25:28 BST 2024}]
Buffer and trigger
An event processor can buffer multiple events without causing any triggers to fire, and at some point in the future
cause all potentially dirty trigger to fire. This is known as buffering and triggering it is achieved by call
EventProcessr.bufferEvent
multiple times and then following it with a call EventProcessor.triggerCalculation
Code sample
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode(), new MyNode2()));
processor.init();
processor.bufferEvent("test");
System.out.println();
processor.bufferEvent(200);
System.out.println();
processor.bufferEvent(50);
System.out.println();
processor.triggerCalculation();
}
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode event received:" + stringToProcess);
return true;
}
}
public static class MyNode2 {
@OnEventHandler
public boolean handleIntEvent(int intToProcess) {
boolean propagate = intToProcess > 100;
System.out.println("MyNode2 conditional propagate:" + propagate);
return propagate;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode2 event received:" + stringToProcess);
return true;
}
}
public static class Child{
private final MyNode myNode;
private final MyNode2 myNode2;
public Child(MyNode myNode, MyNode2 myNode2) {
this.myNode = myNode;
this.myNode2 = myNode2;
}
@OnParentUpdate
public void node1Updated(MyNode myNode1){
System.out.println("1 - myNode updated");
}
@OnParentUpdate
public void node2Updated(MyNode2 myNode2){
System.out.println("2 - myNode2 updated");
}
@OnTrigger
public boolean triggered(){
System.out.println("Child:triggered");
return true;
}
}
Sample log
MyNode2 event received:test
2 - myNode2 updated
MyNode event received:test
1 - myNode updated
MyNode2 conditional propagate:true
2 - myNode2 updated
MyNode2 conditional propagate:false
Child:triggered
Outputs from a processor
Sink
An application can register for output from the EventProcessor by supplying a consumer to addSink and removed with a call to removeSink. Bound classes can publish to sinks during an event process cycle, any registered sinks will see the update as soon as the data is published, not at the end of the cycle.
- Adding sink -
processor.addSink("mySink", (Consumer<T> t) ->{})
- Removing sink -
processor.removeSink("mySink")
Code sample
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg ->
DataFlow.subscribeToIntSignal("myIntSignal")
.mapToObj(d -> "intValue:" + d)
.sink("mySink")//CREATE A SINK IN THE PROCESSOR
);
processor.init();
//ADDING A SINK
processor.addSink("mySink", (Consumer<String>) System.out::println);
processor.publishSignal("myIntSignal", 10);
processor.publishSignal("myIntSignal", 256);
//REMOVING A SINK
processor.removeSink("mySink");
processor.publishSignal("myIntSignal", 512);
}
Sample log
intValue:10
intValue:256
Audit logging
Fluxtion provides an audit logging facility that captures the output when an event processing cycle is triggered. An event processor must be generated with audit logging enabled, nodes can optionally write key/value tuples. Audit output includes:
- The triggering event
- Time of the event
- The order in which nodes are invoked
- A map structure that each node can write a key/value pair to
The control of the output is covered in the control section, by default the audit log writes output using the standard java util logging framework.
Code sample
public class AuditExample {
public static void main(String[] args) {
var processor = Fluxtion.interpret(c ->{
c.addNode(new MyAuditingNode());
c.addEventAudit();
});
processor.init();
//AUDIT IS INFO BY DEFAULT
processor.onEvent("detailed message 1");
}
public static class MyAuditingNode extends EventLogNode {
@Initialise
public void init(){
auditLog.info("MyAuditingNode", "init");
auditLog.info("MyAuditingNode_debug", "some debug message");
}
@OnEventHandler
public boolean stringEvent(String event) {
auditLog.info("event", event);
auditLog.debug("charCount", event.length());
return true;
}
}
}
Sample log
Audit records are encoded as a stream of yaml documents by default.
eventLogRecord:
eventTime: 1714197503584
logTime: 1714197503584
groupingId: null
event: LifecycleEvent
eventToString: Init
nodeLogs:
- myAuditingNode_0: { MyAuditingNode: init, MyAuditingNode_debug: some debug message}
endTime: 1714197503584
---
Apr 27, 2024 6:58:23 AM com.fluxtion.runtime.audit.EventLogManager calculationLogConfig
INFO: updating event log config:EventLogConfig{level=DEBUG, logRecordProcessor=null, sourceId=null, groupId=null}
eventLogRecord:
eventTime: 1714197503597
logTime: 1714197503598
groupingId: null
event: String
eventToString: detailed message 1
nodeLogs:
- myAuditingNode_0: { event: detailed message 1, charCount: 18}
endTime: 1714197503598
---
Control of a processor
Lifecycle
User nodes that are added to the processing graph can attach to the lifecycle callbacks by annotating methods with the relevant annotations. The event processor implements the Lifecycle interface, application code calls a lifecyle method to trigger a lifecycle process cycle.
EventProcessor.init() must always be called before any events are submitted to an event processor instance
Code sample
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
processor.start();
processor.stop();
processor.tearDown();
}
public static class MyNode {
@Initialise
public void myInitMethod() {
System.out.println("Initialise");
}
@Start
public void myStartMethod() {
System.out.println("Start");
}
@Stop
public void myStopMethod() {
System.out.println("Stop");
}
@TearDown
public void myTearDownMethod() {
System.out.println("TearDown");
}
}
Sample log
Initialise
Start
Stop
TearDown
Clocks and time
User classes bound into an event processor have access to a Clock to query the current time in the processor. The Clock instance is driven by a strategy the supplies a long to the clock. A ClockStrategy can be changed at runtime by the application. This is particularly useful during replay mode, or when writing unit tests that have time dependent variables and the time needs to be data driven.
Set the clock strategy with
processor.setClockStrategy(ClockStrategy customClockStrategy);
The clock has no units it is a long the application can use in any way it wants.
Code sample
public class ClockExample {
public static void main(String[] args) {
var processor = Fluxtion.interpret(new TimeLogger());
processor.init();
//PRINT CURRENT TIME
processor.onEvent(DateFormat.getDateTimeInstance());
//USE A SYNTHETIC STRATEGY TO SET TIME FOR THE PROCESSOR CLOCK
LongAdder syntheticTime = new LongAdder();
processor.setClockStrategy(syntheticTime::longValue);
//SET A NEW TIME - GOING BACK IN TIME!!
syntheticTime.add(1_000_000_000);
processor.onEvent(DateFormat.getDateTimeInstance());
//SET A NEW TIME - BACK TO THE FUTURE
syntheticTime.add(1_800_000_000_000L);
processor.onEvent(DateFormat.getDateTimeInstance());
}
public static class TimeLogger {
public Clock wallClock = Clock.DEFAULT_CLOCK;
@OnEventHandler
public boolean publishTime(DateFormat dateFormat) {
System.out.println("time " + dateFormat.format(new Date(wallClock.getWallClockTime())));
return true;
}
}
}
Sample log
time 27 Apr 2024, 09:30:31
time 12 Jan 1970, 14:46:40
time 15 Jan 2027, 08:00:02
Batch support
Batch callbacks are supported through the BatchHandler interface that the generated EventHandler implements. Any methods
that are annotated with, @OnBatchPause
or @OnBatchEnd
will receive calls from the matching BatchHandler method.
Code sample
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
processor.onEvent("test");
//use BatchHandler service
BatchHandler batchHandler = (BatchHandler)processor;
batchHandler.batchPause();
batchHandler.batchEnd();
}
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode event received:" + stringToProcess);
return true;
}
@OnBatchPause
public void batchPause(){
System.out.println("MyNode::batchPause");
}
@OnBatchEnd
public void batchEnd(){
System.out.println("MyNode::batchEnd");
}
}
Sample log
MyNode event received:test
MyNode::batchPause
MyNode::batchEnd
Audit log control
The audit log facility can be controlled at runtime by the application. LogRecord are the output of the event auditor, the application can customise LogRecord handling in three ways:
- Change the log level - info is the default level
- Log record encoding - yaml is the default encoding
- Log record processor - java.util.logging is the default LogRecord processor
Code sample
public class AuditControlExample {
public static void main(String[] args) {
var processor = Fluxtion.interpret(c ->{
c.addNode(new MyAuditingNode());
c.addEventAudit();
});
processor.init();
//AUDIT IS INFO BY DEFAULT
processor.onEvent("detailed message 1");
//CHANGE LOG LEVEL DYNAMICALLY
processor.setAuditLogLevel(EventLogControlEvent.LogLevel.DEBUG);
processor.onEvent("detailed message 2");
//REPLACE LOGRECORD ENCODER
processor.setAuditLogRecordEncoder(new MyLogEncoder(Clock.DEFAULT_CLOCK));
//REPLACE LOGRECORD PROCESSOR
processor.setAuditLogProcessor(logRecord -> {
System.err.println("WARNING -> "+ logRecord.toString());
});
processor.onEvent("detailed message 1");
processor.onEvent("detailed message 2");
}
public static class MyLogEncoder extends LogRecord{
public MyLogEncoder(Clock clock) {
super(clock);
}
@Override
public CharSequence asCharSequence(){
return "IGNORING ALL RECORDS!!";
}
}
public static class MyAuditingNode extends EventLogNode {
@Initialise
public void init(){
auditLog.info("MyAuditingNode", "init");
auditLog.info("MyAuditingNode_debug", "some debug message");
}
@OnEventHandler
public boolean stringEvent(String event) {
auditLog.info("event", event);
auditLog.debug("charCount", event.length());
return true;
}
}
}
Sample log
eventLogRecord:
eventTime: 1714205423167
logTime: 1714205423167
groupingId: null
event: LifecycleEvent
eventToString: Init
nodeLogs:
- myAuditingNode_0: { MyAuditingNode: init, MyAuditingNode_debug: some debug message}
endTime: 1714205423170
---
eventLogRecord:
eventTime: 1714205423170
logTime: 1714205423170
groupingId: null
event: String
eventToString: detailed message 1
nodeLogs:
- myAuditingNode_0: { event: detailed message 1}
endTime: 1714205423172
---
Apr 27, 2024 9:10:23 AM com.fluxtion.runtime.audit.EventLogManager calculationLogConfig
INFO: updating event log config:EventLogConfig{level=DEBUG, logRecordProcessor=null, sourceId=null, groupId=null}
eventLogRecord:
eventTime: 1714205423186
logTime: 1714205423186
groupingId: null
event: String
eventToString: detailed message 2
nodeLogs:
- myAuditingNode_0: { event: detailed message 2, charCount: 18}
endTime: 1714205423186
---
WARNING -> IGNORING ALL RECORDS!!
WARNING -> IGNORING ALL RECORDS!!
Setting context parameters
An application can dynamically inject key/value properties into a running event processor. The context parameters are available for any bound node to look up using an injected EventProcessorContext. Setting context parameters does not trigger an event processing cycle.
Context parameters api:
- Single parameter -
EventProcessor.addContextParameter(Object key, Object value)
- Overwrite all parameters -
EventProcessor.setContextParameterMap(Map<Object, Object> newContextMapping)
Code sample
public class ContextParamInput {
public static void main(String[] args) {
var processor = Fluxtion.interpret(new ContextParamReader());
processor.init();
processor.addContextParameter("myContextParam1", "[param1: update 1]");
processor.start();
processor.addContextParameter("myContextParam1", "[param1: update 2]");
processor.addContextParameter("myContextParam2", "[param2: update 1]");
processor.start();
}
public static class ContextParamReader {
@Inject
public EventProcessorContext context;
@Start
public void start() {
System.out.println("myContextParam1 -> " + context.getContextProperty("myContextParam1"));
System.out.println("myContextParam2 -> " + context.getContextProperty("myContextParam2"));
System.out.println();
}
}
}
Sample log
myContextParam1 -> [param1: update 1]
myContextParam2 -> null
myContextParam1 -> [param1: update 2]
myContextParam2 -> [param2: update 1]
Runtime inject
Instances can be injected at runtime to a node using the @Inject(instanceName = "startData")
annotation on a
InstanceSupplier data member. The instance has to be injected at runtime to a built event processor before calling init with:
processor.injectNamedInstance(new Date(1000000), "startData")
Instances can be updated once the processor is running by injecting a new instance with the same name.
Code sample
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.injectNamedInstance(new Date(1000000), "startData");
processor.init();
processor.onEvent("TEST");
processor.injectNamedInstance(new Date(999000000), "startData");
processor.onEvent("TEST");
}
public static class MyNode{
@Inject(instanceName = "startData")
public InstanceSupplier<Date> myDate;
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("runtime injected:" + myDate.get());
return true;
}
}
Sample log
runtime injected:Thu Jan 01 01:16:40 GMT 1970
runtime injected:Mon Jan 12 14:30:00 GMT 1970
Query into a processor
User node lookup by id
NamedNodes are available for lookup from an event processor instance using their name. The application can then use the reference to pull data from the node without requiring an event process cycle to push data to an output.
Code sample
public class GetNodeByIdExample {
public static void main(String[] args) throws NoSuchFieldException {
var processor = Fluxtion.interpret(new MondayChecker());
processor.init();
processor.onEvent("Monday");
processor.onEvent("Tuesday");
processor.onEvent("Wednesday");
//LOOKUP USER NODE
MondayChecker mondayChecker = processor.getNodeById("MondayChecker");
//PULL DATA
System.out.println("PULLING Monday count:" + mondayChecker.getMondayCount());
processor.onEvent("Monday");
//PULL DATA
System.out.println("PULLING Monday count:" + mondayChecker.getMondayCount());
}
public static class MondayChecker implements NamedNode {
private int mondayCounter = 0;
@OnEventHandler
public boolean checkIsMonday(String day){
boolean isMonday = day.equalsIgnoreCase("monday");
mondayCounter += isMonday ? 1 : 0;
return isMonday;
}
public int getMondayCount() {
return mondayCounter;
}
@Override
public String getName() {
return "MondayChecker";
}
}
}
Sample log
PULLING Monday count:1
PULLING Monday count:2
DataFlow node lookup by id
DataFlow nodes are available for lookup from an event processor instance using their name. In this case the lookup returns a reference to the wrapped value and not the wrapping node. The application can then use the reference to pull data from the node without requiring an event process cycle to push data to an output.
When building the graph with DSL a call to id
makes that element addressable for lookup.
Code sample
public class GetFlowNodeByIdExample {
public static void main(String[] args) throws NoSuchFieldException {
var processor = Fluxtion.interpret(c ->{
DataFlow.subscribe(String.class)
.filter(s -> s.equalsIgnoreCase("monday"))
//ID START - this makes the wrapped value accessible via the id
.mapToInt(Mappers.count()).id("MondayChecker")
//ID END
.console("Monday is triggered");
});
processor.init();
processor.onEvent("Monday");
processor.onEvent("Tuesday");
processor.onEvent("Wednesday");
//ACCESS THE WRAPPED VALUE BY ITS ID
Integer mondayCheckerCount = processor.getStreamed("MondayChecker");
System.out.println("Monday count:" + mondayCheckerCount + "\n");
//ACCESS THE WRAPPED VALUE BY ITS ID
processor.onEvent("Monday");
mondayCheckerCount = processor.getStreamed("MondayChecker");
System.out.println("Monday count:" + mondayCheckerCount);
}
}
Sample log
Monday is triggered
Monday count:1
Monday is triggered
Monday count:2
Auditor lookup by id
Auditors are available for lookup from an event processor instance using their name. The application can then use the reference to pull data from the Auditor without requiring an event process cycle to push data to an output.
Code sample
public class AuditLookupExample {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
var processor = Fluxtion.interpret(c -> {
//ADDING A NAMED AUDITOR
c.addAuditor(new MyAuditor(), "myAuditor");
DataFlow.subscribe(String.class);
});
processor.init();
processor.onEvent("A");
processor.onEvent("B");
processor.onEvent("C");
//LOOKUP AUDITOR BY NAME
MyAuditor myAuditor = processor.getAuditorById("myAuditor");
//PULL DATA FROM AUDITOR
System.out.println("\nPULL MyAuditor::invocationCount " + myAuditor.getInvocationCount());
}
public static class MyAuditor implements Auditor {
private int invocationCount = 0;
@Override
public void nodeRegistered(Object node, String nodeName) {
}
@Override
public void eventReceived(Object event) {
System.out.println("MyAuditor::eventReceived " + event);
invocationCount++;
}
public int getInvocationCount() {
return invocationCount;
}
}
}
Sample log
MyAuditor::eventReceived Init
MyAuditor::eventReceived A
MyAuditor::eventReceived B
MyAuditor::eventReceived C
PULL MyAuditor::invocationCount 4