Generating an event processor
The Fluxtion compiler library provides several tools for generating an event processor from a model supplied by the client. This section documents the utilities and tools available for event processor generation and how to use them. The binding of nodes to the event processor is covered in binding user classes
The Fluxtion class gives static access to the generation methods.
The source project for the examples can be found here
Generating modes
To complete building an event processor the code model and a generation mode is passed to the compiler. The final event processor binds in all user classes combined with pre-calculated event dispatch to meet the dispatch rules.
Supported generation modes
- Interpreted In memory execution of an event processor
- In memory compilation In memory execution of compiled event processor
- Ahead of time compilation Ahead of time generation of compiled event processor, execute in another process
Regardless of which generation mode is used the generated event processor will behave the same with respect to the dispatch rules.
Generation modes comparison
Mode | Runtime libraries Required |
Advantage | Disadvantages | Use case |
---|---|---|---|---|
Interpreted | Fluxtion - compiler Fluxtion - runtime Third party libs |
Supports 1000’s nodes Quick to develop |
Slower execution produces garbage More runtime libraries Harder to debug with map based dispatch |
|
In memory compilation dispatch only |
Fluxtion - compiler Fluxtion - runtime Third party libs |
Fast execution zero gc Supports 100’s nodes Compile new graphs at runtime easy to debug |
More runtime libraries Limit on generated code size inline lambda not supported |
Uses the objects provided as nodes |
In memory compilation | Fluxtion - compiler Fluxtion - runtime Third party libs |
Fast execution zero gc Supports 100’s nodes Compile new graphs at runtime easy to debug |
More runtime libraries Limit on generated code size inline lambda not supported |
Constructs new objects as nodes |
Ahead of time compilation | Fluxtion - runtime | Fast execution zero gc Supports 100’s nodes Single runtime library easy to debug |
Limit on generated code size inline lambda not supported |
Table of contents
Interpreted overview
The interpreted mode implements the event processor with a map based dispatch. Generation and execution all happen in the same process so both the runtime and compiler Fluxtion libraries are required at runtime.
Fluxtion.interpret methods are the entry point to generating an interpreted version of the event processor.
A user can either supply a varargs list of objects to include in the final event processor, or configure a supplied EventProcessorConfig and imperatively add nodes. See the interpreted section of binding user classes document for a description of binding functions into an event processor.
Interpreted code sample
Generating an in-memory event processor is achieved with a call to one of the overloaded Fluxtion.interpret methods. Imperative and DSL models are both generated through the same call to interpret.
Imperative generation example
Interpret can be called with list of nodes that are implicitly added to the event processor or the use the version that accepts an EventProcessorConfig consumer for user code to imperatively add nodes.
EventProcessor<?> processor;
//these are equivalent processors
//add a list of nodes using short cut method
processor = Fluxtion.interpret(new MyNode("node A"));
//add nodes using supplied EventProcessorConfig
processor = Fluxtion.interpret(cfg -> {
cfg.addNode(new MyNode("node A"));
});
//these are equivalent processors
processor = Fluxtion.interpret(new MyNode("node A"), new MyNode("node B"));
processor = Fluxtion.interpret(cfg -> {
cfg.addNode(new MyNode("node A"));
cfg.addNode(new MyNode("node B"));
});
//executing event processor
processor.init();
processor.onEvent("hello world");
Functional DSL generation example
To generate an interpreted version using Fluxtion DSL, the DataFlow calls must happen within the context of interpret
version that accepts an EventProcessorConfig
consumer Fluxtion.interpret(Consumer<EventProcessorConfig> configProcessor)
//DSL calls must use the Supplier<>
Fluxtion.interpret(cfg -> {
DataFlow.subscribe(String.class)
.push(new MyNode("node A")::handleStringEvent);
});
Compiling overview
The compiling mode generates a source file that represents the event processor, ready for compilation and use within an application. The source generation and compilation process can happen either in process or as part of the build stage. If the source generation happens as part of the build process the event processor is classed as ahead of time (AOT).
The generate source file is a serialized state of the event processor and all the instances it manages. This places stricter requirements on the nodes bound to the event processor than running in interpreted mode. Bound user classes are declared as fields in the generated event processor. The fields of a bound class will be serialized as well.
The objects instances in the event processor can either be created with the event processor are not shared or they can be provided by the user and the event processor only manages the dispatch and not the lifetime of the bound instances.
- compiling dispatch only The instances in the graph are provided by the user
- compiling The instances in the graph are instantiated in the processor and not shared
FluxtionCompilerConfig
There are several compile and compileAot methods in the Fluxtion, most of these are short cuts that delegate to a single compile method:
EventProcessor compile(Consumer<EventProcessorConfig> sepConfig, Consumer<FluxtionCompilerConfig> cfgBuilder)
We have previosuly seen how EventProcessorConfig controls what elements are bound into the event processor. To configure the generated source and compiler user code calls methods on the supplied FluxtionCompilerConfig.
Configurable properties for compilation
The FluxtionCompilerConfig instance allows user code to configure the compilation process via these properties
- Compilation control
- interpreted - generate an interpreted version of the event processor
- compileSource - flag to enable compilation of any generated source files
- ClassLoader - override classLoader at generation time
- Source control
- templateSep - The velocity template file to use in the source generation process
- className - class name for generated event processor
- packageName - package name for generated event processor
- addBuildTime - flag to add build time to the generated source files
- formatSource - flag enable formatting of the generated source files
- Output control
- outputDirectory - Output directory for generated event processor source files
- buildOutputDirectory - Output directory where compiled artifacts should be written
- resourcesOutputDirectory - Output directory for any resources generated with the event processor
- writeSourceToFile - Flag controlling if the generated source is written to output or is transient memory only version
- sourceWriter - if writeSourceToFile is false this writer will capture the content of the generation process
- generateDescription - Flag controlling generation of meta data description resources
Compiling in process
Compiling in process is a very similar process to generating an interpreted event processor, just replace the calls to
Fluxtion.interpret
with Fluxtion.compile
. The source code will be generated and compiled in memory, no configuration
of FluxtionCompilerConfig is required
Imperative generation example
Compile can be called with list of nodes that are implicitly added to the event processor or the use the version that accepts an EventProcessorConfig consumer for user code to imperatively add nodes.
EventProcessor<?> processor;
//these are equivalent processors
//add a list of nodes using short cut method
processor = Fluxtion.compile(new MyNode("node A"));
//add nodes using supplied EventProcessorConfig
processor = Fluxtion.compile(cfg -> {
cfg.addNode(new MyNode("node A"));
});
//these are equivalent processors
processor = Fluxtion.compile(new MyNode("node A"), new MyNode("node B"));
processor = Fluxtion.compile(cfg -> {
cfg.addNode(new MyNode("node A"));
cfg.addNode(new MyNode("node B"));
});
//executing event processor
processor.init();
processor.onEvent("hello world");
Functional DSL generation example
To generate an in process compiled version using Fluxtion DSL, the DataFlow calls must happen within the context of interpret
version that accepts an EventProcessorConfig consumer Fluxtion.compile(Consumer<EventProcessorConfig> configProcessor)
//DSL calls must use the Supplier<>
Fluxtion.compile(cfg -> {
DataFlow.subscribe(String.class)
.push(new MyNode("node A")::handleStringEvent);
});
Compiling dispatch only in process
Compiling dispatch only in process is a very similar process to generating an interpreted event processor, just replace the calls to
Fluxtion.interpret
with Fluxtion.compileDispatcher
. The source code will be generated and compiled in memory, no configuration
of FluxtionCompilerConfig is required. The instances in the event processor receiving event callbacks are the ones provided
by the user.
Imperative generation example
Compile can be called with list of nodes that are implicitly added to the event processor or the use the version that accepts an EventProcessorConfig consumer for user code to imperatively add nodes. The user can access the node from the reference passed into the compileDispatcher method
EventProcessor<?> processor;
//these are equivalent processors
//add a list of nodes using short cut method
var myNode = new MyNode("node A");
var myNodeB = new MyNode("node B");
processor = Fluxtion.compileDispatcher(myNode);
//add nodes using supplied EventProcessorConfig
processor = Fluxtion.compileDispatcher(cfg -> {
cfg.addNode(myNode);
});
//these are equivalent processors
processor = Fluxtion.compileDispatcher(myNode, myNodeB);
processor = Fluxtion.compileDispatcher(cfg -> {
cfg.addNode(myNode);
cfg.addNode(myNodeB);
});
//executing event processor
processor.init();
processor.onEvent("hello world");
Compiling AOT - programmatically
Compiling an event processor AOT is a very similar process to compiling in process, we use the method
EventProcessor.compile(Consumer<EventProcessorConfig> sepConfig, Consumer<FluxtionCompilerConfig> cfgBuilder)
How and where the source code will be generated is configured through the supplied FluxtionCompilerConfig instance.
EventProcessor.compile
can be called in a separate process to generate the event processor AOT. The generated source
file can be used in the project as a normal class.
A maven plugin is provided as part of the Fluxtion toolset that integrates AOT building into a standard part of your build.
Imperative generation
Compile can be called with list of nodes that are implicitly added to the event processor or the use the version that accepts an EventProcessorConfig consumer for user code to imperatively add nodes.
public static void main(String[] args) {
Fluxtion.compile(
//binds classes to event processor
eventProcessorConfig -> eventProcessorConfig.addNode(new MyNode("node A")),
//controls the generation
fluxtionCompilerConfig -> {
fluxtionCompilerConfig.setClassName("MyEventProcessor");
fluxtionCompilerConfig.setPackageName("com.fluxtion.example.aot.generated");
});
}
The output from the generation process is a source file MyEventProcessor.java in package com.fluxtion.example.aot.generated written to directory com/fluxtion/example/aot/generated below the current working directory of main method. A message to console is printed to confirm the generation process output
21-Apr-24 07:39:25 [main] INFO EventProcessorCompilation - generated EventProcessor file: /fluxtion-examples/src/main/java/com/fluxtion/example/aot/generated/MyEventProcessor.java
Using an AOT event processor
Once generated AOT use the event processor like any normal java class in application code.
public static void main(String[] args) {
var processor = new com.fluxtion.example.aot.generated.MyEventProcessor();
processor.init();
processor.onEvent("hello world");
}
Rules for serializing bound classes
- Bound class must have a public constructor or constructors, either:
- A zero argument constructor
- For complex construction add a custom serializer
with
EventProcessorConfig.addClassSerialize
- Final non-transient fields must be assigned in the constructor
- A constructor must exist that matches the final non-transient fields as arguments
- Transient fields are not serialised
- Fields annotated with
@FluxtionIgnore
are not serialised - Rules for serializing fields of bound classes
- Only non-transient fields are serialized
- All standard types are supported
- Java bean properties are serialized using setter
- Public fields are serialized
Serialize fields example
This example demonstrates field serialisation for standard types and passing a named constructor argument. Because there are 2 String fields the generated constructor needs a hint to which field is assigned in the constructor:
1
2
@AssignToField("cId")
private final String cId;
To ensure the consructor argument is taken from the cId field when generating the event processor source file.
The transient field transientName
is ignored during the source generation process.
public class FieldsExample {
public enum SampleEnum {VAL1, VAL2, VALX}
public static void main(String[] args) {
Fluxtion.compileAot("com.fluxtion.example.reference.generation.genoutput", "FieldsExampleProcessor",
BasicTypeHolder.builder()
.cId("cid")
.name("holder")
.myChar('$')
.longVal(2334L)
.intVal(12)
.shortVal((short) 45)
.byteVal((byte) 12)
.doubleVal(35.8)
.doubleVal2(Double.NaN)
.floatVal(898.24f)
.boolean1Val(true)
.boolean2Val(false)
.classVal(String.class)
.enumVal(SampleEnum.VAL2)
.build());
}
@Data
@Builder
@AllArgsConstructor
@RequiredArgsConstructor
public static class BasicTypeHolder implements NamedNode {
private String name;
@AssignToField("cId")
private final String cId;
private char myChar;
private long longVal;
private int intVal;
private short shortVal;
private byte byteVal;
private double doubleVal;
private double doubleVal2;
private float floatVal;
private boolean boolean1Val;
private boolean boolean2Val;
private Class<?> classVal;
private SampleEnum enumVal;
}
}
Serialised event processor
The full event processor is FieldsExampleProcessor this excerpt shows the relevant statements:
public class FieldsExampleProcessor
implements EventProcessor<FieldsExampleProcessor>,
StaticEventProcessor,
InternalEventProcessor,
BatchHandler,
Lifecycle {
//REMOVED FOR CLARITY
private final BasicTypeHolder holder = new BasicTypeHolder("cid");
//REMOVED FOR CLARITY
public FieldsExampleProcessor(Map<Object, Object> contextMap) {
context.replaceMappings(contextMap);
holder.setBoolean1Val(true);
holder.setBoolean2Val(false);
holder.setByteVal((byte) 12);
holder.setClassVal(java.lang.String.class);
holder.setDoubleVal(35.8);
holder.setDoubleVal2(Double.NaN);
holder.setEnumVal(SampleEnum.VAL2);
holder.setFloatVal(898.24f);
holder.setIntVal(12);
holder.setLongVal(2334L);
holder.setMyChar('$');
holder.setName("holder");
holder.setShortVal((short) 45);
//node auditors
initialiseAuditor(clock);
initialiseAuditor(nodeNameLookup);
subscriptionManager.setSubscribingEventProcessor(this);
context.setEventProcessorCallback(this);
}
//REMOVED FOR CLARITY
}
Custom field serialization
A custom serializer can be injected for fields that cannot be serialized with the standard algorithm. Add a custom serializer to the EventProcesserConfig instance with
EventProcesserConfig.addClassSerializer(MyThing.class, CustomSerializerExample::customSerialiser);
The custom serializer function generates a String that can be compiled in the event processor generated source
public class CustomSerializerExample {
public static void main(String[] args) {
Fluxtion.compile(
cfg -> {
//static method is not serializable
var myThing = MyThing.newThing("my instance param");
cfg.addNode(new StringHandler(myThing));
//register custom serializer for MyThing
cfg.addClassSerializer(MyThing.class, CustomSerializerExample::customSerialiser);
},
compilerConfig -> {
compilerConfig.setClassName("CustomSerializerExampleProcessor");
compilerConfig.setPackageName("com.fluxtion.example.reference.generation.genoutput");
});
}
//Implements the custom serialiser function
public static String customSerialiser(FieldContext<MyThing> fieldContext) {
fieldContext.getImportList().add(MyThing.class);
MyThing myThing = fieldContext.getInstanceToMap();
return "MyThing.newThing(\"" + myThing.getID() + "\")";
}
public static class MyThing {
private final String iD;
private MyThing(String iD) {
this.iD = iD;
}
public String getID() {
return iD;
}
public static MyThing newThing(String in) {
return new MyThing(in);
}
}
public static class StringHandler {
private final Object delegate;
public StringHandler(Object delegate) {
this.delegate = delegate;
}
@OnEventHandler
public boolean onEvent(MyThing event) {
return false;
}
}
}
Serialised event processor
The generated event processor
is CustomSerializerExampleProcessor
this excerpt shows the relevant statements that invokes the static build method MyThing.newThing("my instance param")
public class CustomSerializerExampleProcessor
implements EventProcessor<CustomSerializerExampleProcessor>,
StaticEventProcessor,
InternalEventProcessor,
BatchHandler,
Lifecycle {
//Node declarations
//REMOVED FOR CLARITY
private final StringHandler stringHandler_0 = new StringHandler(MyThing.newThing("my instance param"));
//REMOVED FOR CLARITY
}
Compile AOT - spring configuration
Spring configuration is natively supported by Fluxtion. Any beans in the spring ApplicationContext will be bound into the model and eventually the generated event processor. Pass the spring ApplicationContext into fluxtion and compile AOT with
FluxtionSpring.compileAot(ApplicationContext appContext, Consumer<FluxtionCompilerConfig> fluxtionCompilerConfig)
The generated event processor is CustomSerializerExampleProcessor
See FluxtionSpring for the various compilation methods supported for Spring integration.
public class SpringConfigAdd {
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::received:" + stringToProcess);
return true;
}
}
public static class Root1 {
private final MyNode myNode;
public Root1(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean trigger() {
System.out.println("Root1::triggered");
return true;
}
}
public static void main(String[] args) {
FluxtionSpring.compileAot(
new ClassPathXmlApplicationContext("com/fluxtion/example/reference/spring-example.xml"),
fluxtionCompilerConfig -> {
fluxtionCompilerConfig.setClassName("SpringExampleProcessor");
fluxtionCompilerConfig.setPackageName("com.fluxtion.example.reference.generation.genoutput");
});
}
}
The spring application config
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="myNode" class="com.fluxtion.example.reference.SpringConfigAdd.MyNode">
</bean>
<bean id="root1" class="com.fluxtion.example.reference.SpringConfigAdd.Root1">
<constructor-arg ref="myNode"/>
</bean>
</beans>
Compile AOT - scan for FluxtionGraphBuilder
Fluxtion provides a mechanism for discovering event processor builder classes and invoking each one to generate an event processor AOT.
Requirements to create a discoverable builder
- Implement the interface FluxtionGraphBuilder. The two call back methods separate graph building and compiler configuration
- Create a main method that
calls
Fluxtion.scanAndCompileFluxtionBuilders(File... builderLocations)
where file is the location of the builder classes. - Any Builder annotated with
@Disabled
is ignored by the scan function
The generated event processor is SampleAotBuilderProcessor
//This is a discoverable FluxtionGraphBuilder
public class SampleAotBuilder implements FluxtionGraphBuilder {
@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {
DataFlow.subscribe(String.class)
.mapToInt(String::length);
}
@Override
public void configureGeneration(FluxtionCompilerConfig compilerConfig) {
compilerConfig.setClassName("SampleAotBuilderProcessor");
compilerConfig.setPackageName("com.fluxtion.example.reference.generation.genoutput");
}
//invokes the scanAndCompileFluxtionBuilders utility
public static void main(String[] args) {
Fluxtion.scanAndCompileFluxtionBuilders(Path.of("target/classes/").toFile());
}
}
Build tool scan integration
The mechanism for scan and compile is encapsulated in the maven plugin removing the need to provide the main method. Presently there is no gradle plugin, simple wrapping the discovery function in a main method in the gradle build file works.
To be documented
- Yaml support