Binding user classes to an event processor
The Fluxtion compiler generates an event processor from a model supplied by the client. A model represents the set of user classes that are bound into the event processor. This section documents the various ways the model can be created by the developer. EventProcessorConfig is the class that acts as the model, a number of options are available for adding user classes to an EventProcessorConfig instance:
- Imperative api
- Functional dsl api
- Spring based
- Yaml based
These examples use Fluxtion.interpret
which executes the event processor as an in-process interpretation, the
available output types of the generated event processor are described in Processor generation.
Table of contents
- Example project
- Imperative model building
- Imperatively add a node
- Imperatively add multiple nodes
- Implicitly adding nodes
- Varargs adding nodes
- Adding shared references
- Adding references that are equal
- Naming nodes
- Using name as equality
- Inject a reference
- Inject a singleton
- Inject from factory
- Add an auditor
- Exclude a node from binding
- Inject runtime instance
- Functional model building
- Build with spring configuration
- To be documented
Example project
The source project for the examples can be found here
Imperative model building
Call one of the static Fluxtion Fluxtion build methods with a
Consumer<EventProcessorConfig>
. User classes are bound into the model by invoking methods on the supplied
EventProcessorConfig instance in the consumer. Fluxtion implicitly creates the EventProcessorConfig instance, which is then passed into the generator.
Only nodes in the model that are annotated with a fluxtion annotation are bound into an event processor
Imperatively add a node
Bind an instance of MyNode into the supplied EventProcessorConfig using Fluxtion.interpret(cfg -> cfg.addNode(new MyNode()))
.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("received:" + stringToProcess);
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> cfg.addNode(new MyNode()));
processor.init();
processor.onEvent("TEST");
}
Output
received:TEST
Imperatively add multiple nodes
Bind multiple object instances into the model.
public static class MyNode {
private final String name;
public MyNode(String name) {
this.name = name;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println(name + " received:" + stringToProcess);
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
cfg.addNode(new MyNode("node_1"));
cfg.addNode(new MyNode("node_2"));
cfg.addNode(new MyNode("node_3"));
});
processor.init();
processor.onEvent("TEST");
}
Output
node_1 received:TEST
node_2 received:TEST
node_3 received:TEST
Implicitly adding nodes
imperatively added instances are root nodes for the model. The references of a node are recursively analysed, if a reference points to an instance that has a fluxtion annotation that node is added to the model implicitly. Any implicitly added node will be analysed for implicitly adding more nodes, and so on recursively.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::received:" + stringToProcess);
return true;
}
}
public static class Root1 {
private final MyNode myNode;
public Root1(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean trigger() {
System.out.println("Root1::triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> cfg.addNode(new Root1(new MyNode())));
processor.init();
processor.onEvent("TEST");
}
Output
MyNode::received:TEST
Root1::triggered
Varargs adding nodes
Add root nodes to the model using the varargs derivative of Fluxtion builder method. Can be useful if no customisation of the model is needed and only nodes need to be included in the generated processor.
public static class MyNode {
private final String name;
public MyNode(String name) {
this.name = name;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println(name + " received:" + stringToProcess);
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(
new MyNode("node_1"),
new MyNode("node_2"),
new MyNode("node_3"));
processor.init();
processor.onEvent("TEST");
}
Output
node_1 received:TEST
node_2 received:TEST
node_3 received:TEST
Adding shared references
If two nodes point to a shared instance, that instance will only be added once to the model. The shared node will trigger both children when propagating event notification.
public static class MyNode {
private final String name;
public MyNode(String name) {
this.name = name;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println(name + "::received:" + stringToProcess);
return true;
}
}
public static class Root1 {
private final String name;
private final MyNode myNode;
public Root1(String name, MyNode myNode) {
this.name = name;
this.myNode = myNode;
}
@OnTrigger
public boolean trigger() {
System.out.println(name + "::triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
MyNode myNode1 = new MyNode("myNode_1");
cfg.addNode(new Root1("root_1", myNode1));
cfg.addNode(new Root1("root_2", myNode1));
});
processor.init();
processor.onEvent("TEST");
}
Output
myNode_1::received:TEST
root_1::triggered
root_2::triggered
Adding references that are equal
The model acts like a set, checking equality when a node is added imperatively or implicitly. If the instance to add is equal to a node already in the model it is substituted so only one instance is in the model. References to the duplicate will be re-directed to point at the existing node.
public static class MyNode {
private final String name;
int identifier;
public MyNode(String name, int identifier) {
this.name = name;
this.identifier = identifier;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println(name + " identifier:" + identifier + " received:" + stringToProcess);
return true;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyNode myNode = (MyNode) o;
return name.equals(myNode.name);
}
@Override
public int hashCode() {
return name.hashCode();
}
@Override
public String toString() {
return "MyNode{" +
"name='" + name + '\'' +
", identifier=" + identifier +
'}';
}
}
public static class Root1 {
private final String name;
private final MyNode myNode;
public Root1(String name, MyNode myNode) {
this.name = name;
this.myNode = myNode;
System.out.println(name + "::new " + myNode);
}
@OnTrigger
public boolean trigger() {
System.out.println(name + "::triggered " + myNode);
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
MyNode myNode1 = new MyNode("myNode_1", 999);
MyNode myNode2 = new MyNode("myNode_1", 444);
cfg.addNode(new Root1("root_1", myNode1));
cfg.addNode(new Root1("root_2", myNode2));
});
processor.init();
System.out.println();
processor.onEvent("TEST");
}
Output
root_1::new MyNode{name='myNode_1', identifier=999}
root_2::new MyNode{name='myNode_1', identifier=444}
myNode_1 identifier:999 received:TEST
root_1::triggered MyNode{name='myNode_1', identifier=999}
root_2::triggered MyNode{name='myNode_1', identifier=999}
Naming nodes
Nodes can be given a name when they are added to the graph, if a node has been previously added with the same name the
generation process will fail with a name clash. A node can be accessed by calling EventProcessor.getById
. There are
two ways to give a node an addressable identifier:
- Implement the Named interface on a bound node
- Call
EventProcessorConfig.addnode
with a name as the second argument
public static class MyNode {
private final String name;
public MyNode(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
public static class MyNamedNode implements NamedNode {
private final String name;
public MyNamedNode(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
}
public static void main(String[] args) throws NoSuchFieldException {
var processor = Fluxtion.interpret(cfg -> {
cfg.addNode(new MyNode("customName"), "overrideName");
cfg.addNode(new MyNamedNode("name1"));
});
processor.init();
System.out.println(processor.<MyNode>getNodeById("overrideName").getName());
System.out.println(processor.<MyNamedNode>getNodeById("name1").getName());
}
Output
customName
name1
Using name as equality
The string name can be used as the equality test removing the need for users to implement custom equals and hashcode methods. A utility class SingleNamedNode can be extended to simplify implementation. As equals, hashcode and getName are all synchronised name clashes are avoided and single instance is in the model. This allows user building code to use name as a key to create a shared reference when building.
public static class MyNode extends SingleNamedNode {
public MyNode(String name) {
super(name);
}
}
public static void main(String[] args) throws NoSuchFieldException {
var processor = Fluxtion.interpret(new MyNode("name1"));
processor.init();
System.out.println(processor.<MyNode>getNodeById("name1").getName());
}
Output
name1
Inject a reference
Fluxtion supports injecting a reference with the @Inject
annotation, a new instance will be created by the model
with the default constructor and added implicitly to the model. The injected instance will analysed for implicit
nodes to add to the model.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::received:" + stringToProcess);
return true;
}
}
public static class Root1 {
@Inject
private final MyNode myNode;
public Root1() {
myNode = null;
}
public Root1(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean trigger() {
System.out.println("Root1::triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> cfg.addNode(new Root1()));
processor.init();
processor.onEvent("TEST");
}
Output
MyNode::received:TEST
Root1::triggered
Inject a singleton
Fluxtion supports injecting a singleton reference with the @Inject(singleton = true)
annotation, the same reference
is used throughout the model so only one instance is present in the generated processor.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::received:" + stringToProcess);
return true;
}
}
public static class Root1{
@Inject(singleton = true)
private final MyNode myNode;
private final String id;
public Root1(String id) {
this(null, id);
}
public Root1(MyNode myNode, String id) {
this.myNode = myNode;
this.id = id;
}
@OnTrigger
public boolean trigger() {
System.out.println(id + "::triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Root1("r1"), new Root1("r2"), new Root1("r3"));
processor.init();
processor.onEvent("TEST");
}
Output
MyNode::received:TEST
r1::triggered
r2::triggered
r3::triggered
Inject from factory
A factory can supply the injected instance to the model. A user implements the NodeFactory
and returns the instance to the model. Factories can be registered programmatically with EventProcessorConfig but the
easiest method is to use the ServiceLoader pattern to register factories. In this example the google auto service
annotation to remove all the boiler plate code @AutoService(NodeFactory.class)
.
Configuration key/values can be supplied at the inject site e.g.
@Config(key = "filter", value = "red")
Tuples are wrapped in a map and passed to the NodeFactory.createNode as an argument, the factory creates custom instances using the map as required.
public static class MyNode {
private final String filter;
public MyNode(String filter) {
this.filter = filter;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
boolean match = stringToProcess.equals(filter);
System.out.println(toString() + " match:" + match + " for:" + stringToProcess);
return match;
}
@Override
public String toString() {
return "MyNode{" +
"filter='" + filter + '\'' +
'}';
}
}
public static class Root1 {
@Inject
@Config(key = "filter", value = "red")
public MyNode myNodeRed;
@Inject
@Config(key = "filter", value = "blue")
public MyNode myNodeBlue;
@OnParentUpdate
public void parentUpdated(Object parent){
System.out.println("Root1::parentUpdated " + parent);
}
@OnTrigger
public boolean trigger() {
System.out.println("Root1::triggered");
return true;
}
}
@AutoService(NodeFactory.class)
public static class MyNodeFactory implements NodeFactory<MyNode>{
@Override
public MyNode createNode(Map<String, Object> config, NodeRegistry registry) {
return new MyNode((String) config.get("filter"));
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
cfg.addNode(new Root1());
});
processor.init();
processor.onEvent("red");
System.out.println();
processor.onEvent("ignored");
System.out.println();
processor.onEvent("blue");
}
Output
MyNode{filter='red'} match:true for:red
Root1::parentUpdated MyNode{filter='red'}
MyNode{filter='blue'} match:false for:red
Root1::triggered
MyNode{filter='red'} match:false for:ignored
MyNode{filter='blue'} match:false for:ignored
MyNode{filter='red'} match:false for:blue
MyNode{filter='blue'} match:true for:blue
Root1::parentUpdated MyNode{filter='blue'}
Root1::triggered
Add an auditor
An Auditor can be bound into the generated event processor. An auditor receives meta-data callbacks that allows tracking of the event processing as notifications propagate through the event processor. Implement the Auditor interface and bind it in the processor with:
cfg.addAuditor(new MyAuditor(), "myAuditor")
public static class MyNode extends SingleNamedNode {
public MyNode(String name) {
super(name);
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
return true;
}
@Override
public String toString() {
return "MyNode{}";
}
}
public static class Root1 {
private final MyNode myNode;
public Root1(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean trigger() {
return true;
}
@Override
public String toString() {
return "Root1{" +
"myNode=" + myNode +
'}';
}
}
public static class MyAuditor implements Auditor{
@Override
public void nodeRegistered(Object node, String nodeName) {
System.out.printf("nodeRegistered nodeName:'%s' node:'%s'%n", nodeName, node);
}
@Override
public void eventReceived(Object event) {
System.out.println("eventReceived " + event);
}
@Override
public void nodeInvoked(Object node, String nodeName, String methodName, Object event) {
System.out.printf("nodeInvoked nodeName:'%s' invoked:'%s' node:'%s'%n", nodeName, methodName, node);
}
@Override
public boolean auditInvocations() {
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
cfg.addNode(new MyNode("unlinked"), new Root1(new MyNode("linked")));
cfg.addAuditor(new MyAuditor(), "myAuditor");
});
processor.init();
processor.onEvent("TEST");
}
Output
nodeRegistered nodeName:'linked' node:'MyNode{}'
nodeRegistered nodeName:'unlinked' node:'MyNode{}'
nodeRegistered nodeName:'root1_0' node:'Root1{myNode=MyNode{}}'
nodeRegistered nodeName:'callbackDispatcher' node:'CallbackDispatcherImpl(eventProcessor=com.fluxtion.compiler.generation.targets.InMemoryEventProcessor@79ad8b2f, myStack=[], dispatching=false)'
nodeRegistered nodeName:'subscriptionManager' node:'com.fluxtion.runtime.input.SubscriptionManagerNode@59fa1d9b'
nodeRegistered nodeName:'context' node:'MutableEventProcessorContext{map={}}'
eventReceived Init
eventReceived TEST
nodeInvoked nodeName:'linked' invoked:'handleStringEvent' node:'MyNode{}'
nodeInvoked nodeName:'root1_0' invoked:'trigger' node:'Root1{myNode=MyNode{}}'
nodeInvoked nodeName:'unlinked' invoked:'handleStringEvent' node:'MyNode{}'
Exclude a node from binding
A user node can be excluded from binding into the model by adding the annotation @ExcludeNode
on a user class. An
excluded class can be used as a holder for complex construction logic when the user does not want to use a NodeFactory.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::received:" + stringToProcess);
return true;
}
}
@ExcludeNode
public static class Root1 {
private final MyNode myNode;
public Root1(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean trigger() {
System.out.println("Root1::triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Root1(new MyNode()));
processor.init();
processor.onEvent("TEST");
}
Output
MyNode::received:TEST
Inject runtime instance
Instances can be injected at runtime to a node using the @Inject(instanceName = "startData")
annotation on a
InstanceSupplier data member. The instance has to be injected at
runtime to a built event processor before calling init with:
processor.injectNamedInstance(new Date(1000000), "startData")
Instances can be updated once the processor is running by injecting a new instance with the same name.
public static class MyNode{
@Inject(instanceName = "startData")
public InstanceSupplier<Date> myDate;
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("runtime injected:" + myDate.get());
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.injectNamedInstance(new Date(1000000), "startData");
processor.init();
processor.onEvent("TEST");
processor.injectNamedInstance(new Date(999000000), "startData");
processor.onEvent("TEST");
}
Output
runtime injected:Thu Jan 01 01:16:40 GMT 1970
runtime injected:Mon Jan 12 14:30:00 GMT 1970
Functional model building
Fluxtion supports binding functions into the event processor using functional programming. The DataFlow class provides static method that subscribe to events. Once a flow has been built map/filter/grouping functions can be applied as chained calls. The flow must be built within the Fluxtion build method, DataFlow will add all the functions and classes to the model automatically.
Bind functions to events
To bind functions to a flow of events the subscription must first be created with:
DataFlow.subscribe([event class])
A lambda or a method reference can be bound as the next item in the function flow. A full description of the functional api is in Functional programming
public static String toUpper(String incoming){
return incoming.toUpperCase();
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
DataFlow.subscribe(String.class)
.console("input: '{}'")
.map(FunctionalStatic::toUpper)
.console("transformed: '{}'");
});
processor.init();
processor.onEvent("hello world");
}
Output
input: 'hello world'
transformed: 'HELLO WORLD'
Bind instance functions
Instance functions can be bound into the event processor using method references
public static class PrefixString{
private final String prefix;
public PrefixString(String prefix) {
this.prefix = prefix;
}
public String addPrefix(String input){
return prefix + input;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
DataFlow.subscribe(String.class)
.console("input: '{}'")
.map(new PrefixString("XXXX")::addPrefix)
.console("transformed: '{}'");
});
processor.init();
processor.onEvent("hello world");
}
Output
input: 'hello world'
transformed: 'XXXXhello world'
Combining imperative and functional binding
Both imperative and functional binding can be used in the same build consumer. All the user classes and functions will be added to the model for generation.
public static String toUpper(String incoming){
return incoming.toUpperCase();
}
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("IMPERATIVE received:" + stringToProcess);
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(cfg -> {
DataFlow.subscribe(String.class)
.console("FUNCTIONAL input: '{}'")
.map(CombineFunctionalAndImperative::toUpper)
.console("FUNCTIONAL transformed: '{}'");
cfg.addNode(new MyNode());
});
processor.init();
processor.onEvent("hello world");
}
Output
FUNCTIONAL input: 'hello world'
FUNCTIONAL transformed: 'HELLO WORLD'
IMPERATIVE received:hello world
Build with spring configuration
Spring configuration is natively supported by Fluxtion. Any beans in the spring ApplicationContext will be bound into the model and eventually the generated event processor. Pass the spring ApplicationContext into fluxtion with
FluxtionSpring.interpret(ApplicationContext)
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::received:" + stringToProcess);
return true;
}
}
public static class Root1 {
private final MyNode myNode;
public Root1(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean trigger() {
System.out.println("Root1::triggered");
return true;
}
}
public static void main(String[] args) {
var context = new ClassPathXmlApplicationContext("com/fluxtion/example/reference/spring-example.xml");
var processor = FluxtionSpring.interpret(context);
processor.init();
processor.onEvent("TEST");
}
The spring application config
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="myNode" class="com.fluxtion.example.reference.SpringConfigAdd.MyNode">
</bean>
<bean id="root1" class="com.fluxtion.example.reference.SpringConfigAdd.Root1">
<constructor-arg ref="myNode"/>
</bean>
</beans>
Output
MyNode::received:TEST
Root1::triggered
To be documented
- yaml