Once the event processor has been generated it can be used by the application. An instance of an
EventProcessor
is the bridge between event streams and processing logic, application integration connects the event processor to the application
event sources.
An application can contain multiple event processor instances, they are lightweight objects designed to be
embedded in your application. The application creates instances of event processors and routes events to an instance.
The source project for the examples can be found here
EventProcessors are not thread safe a single event should be processed at one time. Application code is responsible
for synchronizing thread access to an event processor instance.
User code interacts with an event processor instance in one of five ways
The source project for the examples can be found here
Creating a new processor
Fluxtion provides several strategies for creating an event processor instance that you will use in your application:
Generate in process
Use an AOT processor as POJO
Factory method on an event processor instance
Instance per partitioned event stream
Once you have created an instance you can use it as any normal java class. For information about generating an event processor
see build event processor section for further details
Create an instance
Creating a new processor in process by calling one of the Fluxtion methods Fluxtion.interpret or Fluxtion.compile,
or by using an AOT generated processor. An AOT processor is a normal java class that has a constructor, use it as a POJO
in your application.
Code sample
Sample log
Factory method
An event processor instance provides a factory method that creates new instances of the same event processor class. This
can be useful when multiple instances of the same processor are required but each with different state. Applications
sometimes partition an event streams, each instance of the event processor can be bound to that partitioned stream.
Code sample
Sample log
Compiled event processor only support
The compiled event processors create bound nodes using their constructors so each compiled event processor has no shared
references. As an interpreted instance refers to the nodes it has been generated with, it does create new instance of bound nodes,
it cannot create a segregated instance of an event processor.
Factory method processor.newInstance() is only supported for compiled event processors
Partitioning
Fluxtion provides an automatic partitioning function
that can be used to query event flow, partition it and assign a new event processor instance to that flow. The partitioner
takes a factory method to create new event processor instances and a key function to segregate event flow.
The partitioner will create and initialise a new event processor when the key function returns a previously unseen key.
Code sample
Sample log
Inputs to a processor
An event processor responds to input by triggering a calculation cycle, triggering a calculation cycle from an input
is covered in this section. Functions are bound to the calculation cycle to meet the business requirements, see the
mark event handling section for further details. To process inputs the event processor instance must be
initialised before any input is submitted.
1 - Call EventProcessor.init() before submitting any input
2 - Each new input triggers a graph calculation cycle
Events
A simple example demonstrating how an application triggers an event process cycle by calling
processor.onEvent("WORLD");
Code sample
Sample log
Exported service
The generated event processor implements any exported service interface, calling a service method will trigger an
event process cycle. The service reference is discovered in the application with a call to the event processor instance,
An event processor provides several service discovery methods:
Lookup using type inference - ServiceController svc = processor.getExportedService()
Lookup using a specific type - var svc = processor.getExportedService(ServiceController.class)
Consuming a service if it is exported - processor.consumeServiceIfExported(ServiceController.class, s -> {})
Checking if a service is exported - processor.exportsService(MiaServiceController.class))
Code sample
Sample log
Signals
Fluxtion provides some prebuilt signal classes that are re-usable and reduce the application code to write. The Signal
classes can be sent as events triggering an event process cycle using the Api calls built into the event processor:
publish a signal with a filter and value
publish a signal with only a filter
publish a signal with only a value
publish primitive signals with a filter
Code sample
Sample log
Buffer and trigger
An event processor can buffer multiple events without causing any triggers to fire, and at some point in the future
cause all potentially dirty trigger to fire. This is known as buffering and triggering it is achieved by call
EventProcessr.bufferEvent multiple times and then following it with a call EventProcessor.triggerCalculation
Code sample
Sample log
Outputs from a processor
Sink
An application can register for output from the EventProcessor by supplying a consumer to addSink and removed with a
call to removeSink. Bound classes can publish to sinks during an event process cycle, any registered sinks will see
the update as soon as the data is published, not at the end of the cycle.
Fluxtion provides an audit logging facility that captures the output when an event processing cycle is triggered.
An event processor must be generated with audit logging enabled, nodes can optionally write key/value tuples. Audit
output includes:
The triggering event
Time of the event
The order in which nodes are invoked
A map structure that each node can write a key/value pair to
The control of the output is covered in the control section, by default the audit log writes output using the standard
java util logging framework.
Code sample
Sample log
Audit records are encoded as a stream of yaml documents by default.
Control of a processor
Lifecycle
User nodes that are added to the processing graph can attach to the lifecycle callbacks by annotating methods with
the relevant annotations. The event processor implements the Lifecycle interface, application code calls a lifecyle method
to trigger a lifecycle process cycle.
EventProcessor.init() must always be called before any events are submitted to an event processor instance
Code sample
Sample log
Clocks and time
User classes bound into an event processor have access to a Clock to
query the current time in the processor. The Clock instance is driven by a strategy the supplies a long to the clock.
A ClockStrategy can be changed at runtime by the application.
This is particularly useful during replay mode, or when writing unit tests that have time dependent variables and the
time needs to be data driven.
The clock has no units it is a long the application can use in any way it wants.
Code sample
Sample log
Batch support
Batch callbacks are supported through the BatchHandler interface that the generated EventHandler implements. Any methods
that are annotated with, @OnBatchPause or @OnBatchEnd will receive calls from the matching BatchHandler method.
Code sample
Sample log
Audit log control
The audit log facility can be controlled at runtime by the application. LogRecord
are the output of the event auditor, the application can customise LogRecord handling in three ways:
Change the log level - info is the default level
Log record encoding - yaml is the default encoding
Log record processor - java.util.logging is the default LogRecord processor
Code sample
Sample log
Setting context parameters
An application can dynamically inject key/value properties into a running event processor. The context parameters are
available for any bound node to look up using an injected EventProcessorContext. Setting context parameters does not
trigger an event processing cycle.
Context parameters api:
Single parameter - EventProcessor.addContextParameter(Object key, Object value)
Overwrite all parameters - EventProcessor.setContextParameterMap(Map<Object, Object> newContextMapping)
Code sample
Sample log
Runtime inject
Instances can be injected at runtime to a node using the @Inject(instanceName = "startData") annotation on a
InstanceSupplier data member. The instance has to be injected at runtime to a built event processor before calling init with:
Instances can be updated once the processor is running by injecting a new instance with the same name.
Code sample
Sample log
Query into a processor
User node lookup by id
NamedNodes are available for lookup from an event processor instance
using their name. The application can then use the reference to pull data from the node without requiring an event
process cycle to push data to an output.
Code sample
Sample log
DataFlow node lookup by id
DataFlow nodes are available for lookup from an event processor instance using their name. In this case the lookup
returns a reference to the wrapped value and not the wrapping node. The application can then use the reference to
pull data from the node without requiring an event process cycle to push data to an output.
When building the graph with DSL a call to id makes that element addressable for lookup.
Code sample
Sample log
Auditor lookup by id
Auditors are available for lookup from an event processor instance
using their name. The application can then use the reference to pull data from the Auditor without requiring an event
process cycle to push data to an output.