Processing event streams
This section documents the runtime event processing callback api and behaviour taking an imperative approach to using Fluxtion.
Three steps to using Fluxtion
1 - Create user classes and mark event handling methods with Fluxtion annotations
2 - Build the event processor using fluxtion compiler utility
3 - Integrate the event processor in the app and feed it events
In this section we are covering the first of these Create user classes and mark event handling methods with Fluxtion annotations using an imperative approach.
Table of contents
- Event handling primer
- Examples
- Handle event input
- Handle multiple event types
- Filtering events
- Filter variables
- Handling unknown event types
- Triggering children
- Conditional triggering children
- Identify triggering parent
- Identifying parent by name
- After event callback
- After trigger callback
- Push trigger
- No propagate event handler
- No trigger reference
- Override trigger reference
- Non-dirty triggering
- Collection support
- Forking concurrent trigger methods
- Batch support
Event handling primer
User classes bound into an EventProcessor register for event callbacks with annotations. The generated EventProcessor implements the StaticEventProcessor, with the onEvent method acting as a bridge between external event streams and bound processing logic. User code reads the event streams calling onEvent with each new event received, the event processor then notifies annotated callback methods according to the dispatch rules.
Examples
The source project for the examples can be found here
To process an event stream correctly the following requirements must be met:
- Call EventProcessor.init() before first use
- EventProcessors are not thread safe a single event should be processed at one time.
Handle event input
Sends an incoming even to the EventProcessor to trigger a new stream calculation. Any method annotated with
@OnEvent
receives the event from the event processor
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("received:" + stringToProcess);
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
processor.onEvent("TEST");
}
Output
received:TEST
Handle multiple event types
An event handler class can handle multiple event types. Add as many handler methods as required and annotate each method
with an @OnEvent
annotation.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("String received:" + stringToProcess);
return true;
}
@OnEventHandler
public boolean handleIntEvent(int intToProcess) {
System.out.println("Int received:" + intToProcess);
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
processor.onEvent("TEST");
processor.onEvent(16);
}
Output
String received:TEST
Int received:16
Filtering events
User events can implement Event, which provides an optional filtering field. Event handlers can specify the filter value, so they only see events with matching filters
public static class MyNode {
@OnEventHandler(filterString = "CLEAR_SIGNAL")
public boolean allClear(Signal<String> signalToProcess) {
System.out.println("allClear [" + signalToProcess + "]");
return true;
}
@OnEventHandler(filterString = "ALERT_SIGNAL")
public boolean alertSignal(Signal<String> signalToProcess) {
System.out.println("alertSignal [" + signalToProcess + "]");
return true;
}
@OnEventHandler()
public boolean anySignal(Signal<String> signalToProcess) {
System.out.println("anySignal [" + signalToProcess + "]");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
processor.onEvent(new Signal<>("ALERT_SIGNAL", "power failure"));
System.out.println();
processor.onEvent(new Signal<>("CLEAR_SIGNAL", "power restored"));
System.out.println();
processor.onEvent(new Signal<>("HEARTBEAT_SIGNAL", "heartbeat message"));
}
Output
alertSignal [Signal: {filterString: ALERT_SIGNAL, value: power failure}]
anySignal [Signal: {filterString: ALERT_SIGNAL, value: power failure}]
allClear [Signal: {filterString: CLEAR_SIGNAL, value: power restored}]
anySignal [Signal: {filterString: CLEAR_SIGNAL, value: power restored}]
anySignal [Signal: {filterString: HEARTBEAT_SIGNAL, value: heartbeat message}]
Filter variables
The filter value on the event handler method can be extracted from an instance field in the class. Annotate the event
handler method with an attribute that points to the filter variable @OnEventHandler(filterVariable = "[class variable]")
public static class MyNode {
private final String name;
public MyNode(String name) {
this.name = name;
}
@OnEventHandler(filterVariable = "name")
public boolean handleIntSignal(Signal.IntSignal intSignal) {
System.out.printf("MyNode-%s::handleIntSignal - %s%n", name, intSignal.getValue());
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode("A"), new MyNode("B"));
processor.init();
processor.publishIntSignal("A", 22);
processor.publishIntSignal("B", 45);
processor.publishIntSignal("C", 100);
}
Output
MyNode-A::handleIntSignal - 22
MyNode-B::handleIntSignal - 45
Handling unknown event types
An unknown event handler can be registered at runtime with the event processor, to catch any event types that are not handled by the processor. Register the unKnownEventHandler with:
[processor].setUnKnownEventHandler(Consumer<T> consumer)
public class UnknownEventHandling {
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
//set an unknown event handler
processor.setUnKnownEventHandler(e -> System.out.println("Unregistered event type -> " + e.getClass().getName()));
processor.onEvent("TEST");
//handled by unKnownEventHandler
processor.onEvent(Collections.emptyList());
}
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("received:" + stringToProcess);
return true;
}
}
}
Output
received:TEST
Unregistered event type -> java.util.Collections$EmptyList
Triggering children
Event notification is propagated to child instances of event handlers. The notification is sent to any method that is
annotated with an @OnTrigger
annotation. Trigger propagation is in topological order.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("received:" + stringToProcess);
return true;
}
}
public static class MyNode2 {
@OnEventHandler
public boolean handleStringEvent(int intToProcess) {
System.out.println("received:" + intToProcess);
return true;
}
}
public static class Child{
private final MyNode myNode;
private final MyNode2 myNode2;
public Child(MyNode myNode, MyNode2 myNode2) {
this.myNode = myNode;
this.myNode2 = myNode2;
}
@OnTrigger
public boolean triggered(){
System.out.println("Child:triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode(), new MyNode2()));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent(200);
}
Output
received:test
Child:triggered
received:200
Child:triggered
Conditional triggering children
Event notification is propagated to child instances of event handlers if the event handler method returns a true value. A false return value will cause the event processor to swallow the triggering notification.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("received:" + stringToProcess);
return true;
}
}
public static class MyNode2 {
@OnEventHandler
public boolean handleStringEvent(int intToProcess) {
boolean propagate = intToProcess > 100;
System.out.println("conditional propagate:" + propagate);
return propagate;
}
}
public static class Child{
private final MyNode myNode;
private final MyNode2 myNode2;
public Child(MyNode myNode, MyNode2 myNode2) {
this.myNode = myNode;
this.myNode2 = myNode2;
}
@OnTrigger
public boolean triggered(){
System.out.println("Child:triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode(), new MyNode2()));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent(200);
System.out.println();
processor.onEvent(50);
}
Output
received:test
Child:triggered
conditional propagate:true
Child:triggered
conditional propagate:false
Identify triggering parent
It is possible to identify the parent that has triggered a change by adding an @OnParentUpdate
annotation to a child
instance. The method must accept a single parameter of the type of the parent to observe. The OnParent callback gives
granular detail of which parent has changed, whereas OnTrigger callbacks signify that at least one parent is triggering.
The OnParent callbacks are guaranteed to be received before the OnTrigger callback.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode event received:" + stringToProcess);
return true;
}
}
public static class MyNode2 {
@OnEventHandler
public boolean handleIntEvent(int intToProcess) {
boolean propagate = intToProcess > 100;
System.out.println("MyNode2 conditional propagate:" + propagate);
return propagate;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode2 event received:" + stringToProcess);
return true;
}
}
public static class Child{
private final MyNode myNode;
private final MyNode2 myNode2;
public Child(MyNode myNode, MyNode2 myNode2) {
this.myNode = myNode;
this.myNode2 = myNode2;
}
@OnParentUpdate
public void node1Updated(MyNode myNode1){
System.out.println("1 - myNode updated");
}
@OnParentUpdate
public void node2Updated(MyNode2 myNode2){
System.out.println("2 - myNode2 updated");
}
@OnTrigger
public boolean triggered(){
System.out.println("Child:triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode(), new MyNode2()));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent(200);
System.out.println();
processor.onEvent(50);
}
Output
MyNode2 event received:test
2 - myNode2 updated
MyNode event received:test
1 - myNode updated
Child:triggered
MyNode2 conditional propagate:true
2 - myNode2 updated
Child:triggered
MyNode2 conditional propagate:false
Identifying parent by name
When a child has multiple parents of the same type then name resolution can be used to identify the parent that has
triggered the update. Add the variable name to the @OnParentyUpdate
annotation to enforce name and type resolution.
The OnParent callback is invoked according to the same rules as conditional triggering.
public static class MyNode {
private final String name;
public MyNode(String name) {
this.name = name;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println(name + " event received:" + stringToProcess);
return stringToProcess.equals("*") | stringToProcess.equals(name);
}
}
public static class Child{
private final MyNode myNode_a;
private final MyNode myNode_b;
public Child(MyNode myNode_a, MyNode myNode_b) {
this.myNode_a = myNode_a;
this.myNode_b = myNode_b;
}
@OnParentUpdate(value = "myNode_a")
public void node_a_Updated(MyNode myNode_a){
System.out.println("Parent A updated");
}
@OnParentUpdate("myNode_b")
public void node_b_Updated(MyNode myNode_b){
System.out.println("Parent B updated");
}
@OnTrigger
public boolean triggered(){
System.out.println("Child:triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode("A"), new MyNode("B")));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent("*");
System.out.println();
processor.onEvent("A");
System.out.println();
processor.onEvent("B");
}
Output
A event received:test
B event received:test
A event received:*
Parent A updated
B event received:*
Parent B updated
Child:triggered
A event received:A
Parent A updated
B event received:A
Child:triggered
A event received:B
B event received:B
Parent B updated
Child:triggered
After event callback
Register for a post event method callback with the @AfterEvent
annotation. The callback will be executed whenever
any event is sent to the event processor. Unlike the @AfterTrigger
which is only called if the containing instance has
been triggered.
public static class MyNode {
@Initialise
public void init(){
System.out.println("MyNode::init");
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::handleStringEvent received:" + stringToProcess);
return true;
}
@AfterEvent
public void afterEvent(){
System.out.println("MyNode::afterEvent");
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
System.out.println();
processor.onEvent("TEST");
System.out.println();
processor.onEvent(23);
}
Output
MyNode::init
MyNode::afterEvent
MyNode::handleStringEvent received:TEST
MyNode::afterEvent
MyNode::afterEvent
After trigger callback
Register for a post trigger method callback with the @AfterTrigger
annotation. The callback will only be executed if
this class has been triggered on tby an incoming event. Unlike the @AfterEvent
which is always called on any event.
public static class MyNode {
@Initialise
public void init(){
System.out.println("MyNode::init");
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::handleStringEvent received:" + stringToProcess);
return true;
}
@AfterTrigger
public void afterTrigger(){
System.out.println("MyNode::afterTrigger");
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
System.out.println();
processor.onEvent("TEST");
System.out.println();
processor.onEvent(23);
}
Output
MyNode::init
MyNode::handleStringEvent received:TEST
MyNode::afterTrigger
Push trigger
Invert the trigger order so the instance holding the reference receives the event notification before the reference target
and can push data into the target. Annotate the reference to be a push target with the @PushReference
annotation.
The normal order is to trigger the target first, which can perform internal calculations if required. Then the instance holding the reference is triggered so it can pull calculated data from the target reference.
public static class MyNode {
@PushReference
private final PushTarget pushTarget;
public MyNode(PushTarget pushTarget) {
this.pushTarget = pushTarget;
}
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::handleStringEvent " + stringToProcess);
if (stringToProcess.startsWith("PUSH")) {
pushTarget.myValue = stringToProcess;
return true;
}
return false;
}
}
public static class PushTarget {
public String myValue;
@OnTrigger
public boolean onTrigger() {
System.out.println("PushTarget::onTrigger -> myValue:'" + myValue + "'");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode(new PushTarget()));
processor.init();
processor.onEvent("PUSH - test 1");
System.out.println();
processor.onEvent("ignore me - XXXXX");
System.out.println();
processor.onEvent("PUSH - test 2");
}
Output
MyNode::handleStringEvent PUSH - test 1
PushTarget::onTrigger -> myValue:'PUSH - test 1'
MyNode::handleStringEvent ignore me - XXXXX
MyNode::handleStringEvent PUSH - test 2
PushTarget::onTrigger -> myValue:'PUSH - test 2'
No propagate event handler
An event handler method can prevent its method triggering a notification by setting the propagate attribute to false
on any event handler annotation, @OnEventHandler(propagate = false)
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::handleStringEvent received:" + stringToProcess);
return true;
}
@OnEventHandler(propagate = false)
public boolean handleIntEvent(int intToProcess) {
System.out.println("MyNode::handleIntEvent received:" + intToProcess);
return true;
}
}
public static class Child {
private final MyNode myNode;
public Child(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean triggered(){
System.out.println("Child:triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode()));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent(200);
}
Output
MyNode::handleStringEvent received:test
Child:triggered
MyNode::handleIntEvent received:200
No trigger reference
A child can isolate itself from a parent’s event notification by marking the reference with a @NoTriggerReference
annotation. This will stop the onTrigger method from firing even when the parent has triggered.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::handleStringEvent received:" + stringToProcess);
return true;
}
}
public static class MyNode2 {
@OnEventHandler
public boolean handleIntEvent(int intToProcess) {
System.out.println("MyNode2::handleIntEvent received:" + intToProcess);
return true;
}
}
public static class Child {
private final MyNode myNode;
@NoTriggerReference
private final MyNode2 myNode2;
public Child(MyNode myNode, MyNode2 myNode2) {
this.myNode = myNode;
this.myNode2 = myNode2;
}
@OnTrigger
public boolean triggered() {
System.out.println("Child:triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode(), new MyNode2()));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent(200);
}
Output
MyNode::handleStringEvent received:test
Child:triggered
MyNode2::handleIntEvent received:200
Override trigger reference
A child can force only a single parent to fire its trigger, all other parents will be treated as if they were annotated with
@NoTriggerReference
and removed from the event notification triggers for this class.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode::handleStringEvent received:" + stringToProcess);
return true;
}
}
public static class MyNode2 {
@OnEventHandler
public boolean handleIntEvent(int intToProcess) {
System.out.println("MyNode2::handleIntEvent received:" + intToProcess);
return true;
}
}
public static class Child {
private final MyNode myNode;
@TriggerEventOverride
private final MyNode2 myNode2;
public Child(MyNode myNode, MyNode2 myNode2) {
this.myNode = myNode;
this.myNode2 = myNode2;
}
@OnTrigger
public boolean triggered() {
System.out.println("Child:triggered");
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(new MyNode(), new MyNode2()));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent(200);
}
Output
MyNode::handleStringEvent received:test
MyNode2::handleIntEvent received:200
Child:triggered
Non-dirty triggering
The condition that causes a trigger callback to fire can be inverted so that an indication of no change from the parent will cause the trigger to fire.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(int intToProcess) {
boolean propagate = intToProcess > 100;
System.out.println("conditional propagate:" + propagate);
return propagate;
}
}
public static class Child {
private final MyNode myNode;
public Child(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger
public boolean triggered() {
System.out.println("Child:triggered");
return true;
}
}
public static class NonDirtyChild {
private final MyNode myNode;
public NonDirtyChild(MyNode myNode) {
this.myNode = myNode;
}
@OnTrigger(dirty = false)
public boolean triggered() {
System.out.println("NonDirtyChild:triggered");
return true;
}
}
public static void main(String[] args) {
MyNode myNode = new MyNode();
var processor = Fluxtion.interpret(new Child(myNode), new NonDirtyChild(myNode));
processor.init();
processor.onEvent("test");
System.out.println();
processor.onEvent(200);
System.out.println();
processor.onEvent(50);
}
Output
conditional propagate:true
Child:triggered
conditional propagate:false
NonDirtyChild:triggered
Collection support
Collections or arrays of references are supported, if any element in the collection fires a change notification the trigger method will be called. The trigger method is invoked only once per event cycle whatever the number of parent’s updating.
Parent change identity can be tracked using the @OnParentUpdate
annotation.
public static class MyNode {
@FilterId
private final String filter;
private final String name;
public MyNode(String filter, String name) {
this.filter = filter;
this.name = name;
}
@OnEventHandler
public boolean handleIntSignal(IntSignal intSignal) {
System.out.printf("MyNode-%s::handleIntSignal - %s%n", filter, intSignal.getValue());
return true;
}
}
public static class Child {
private final MyNode[] nodes;
private int updateCount;
public Child(MyNode... nodes) {
this.nodes = nodes;
}
@OnParentUpdate
public void parentUpdated(MyNode updatedNode) {
updateCount++;
System.out.printf("parentUpdated '%s'%n", updatedNode.name);
}
@OnTrigger
public boolean triggered() {
System.out.printf("Child::triggered updateCount:%d%n%n", updateCount);
updateCount = 0;
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new Child(
new MyNode("A", "a_1"),
new MyNode("A", "a_2"),
new MyNode("B", "b_1")));
processor.init();
processor.publishIntSignal("A", 10);
processor.publishIntSignal("B", 25);
processor.publishIntSignal("A", 12);
processor.publishIntSignal("C", 200);
}
Output
MyNode-A::handleIntSignal - 10
parentUpdated 'a_1'
MyNode-A::handleIntSignal - 10
parentUpdated 'a_2'
Child::triggered updateCount:2
MyNode-B::handleIntSignal - 25
parentUpdated 'b_1'
Child::triggered updateCount:1
MyNode-A::handleIntSignal - 12
parentUpdated 'a_1'
MyNode-A::handleIntSignal - 12
parentUpdated 'a_2'
Child::triggered updateCount:2
Forking concurrent trigger methods
Forking trigger methods is supported. If multiple trigger methods are fired from a single parent they can be forked to run in parallel using the fork join pool. Only when all the forked trigger methods have completed will an event notification be propagated to their children.
To for a trigger callback use @OnTrigger(parallelExecution = true)
annotation on the callback method.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.printf("%s MyNode::handleStringEvent %n", Thread.currentThread().getName());
return true;
}
}
public static class ForkedChild {
private final MyNode myNode;
private final int id;
public ForkedChild(MyNode myNode, int id) {
this.myNode = myNode;
this.id = id;
}
@OnTrigger(parallelExecution = true)
public boolean triggered() {
int millisSleep = new Random(id).nextInt(25, 200);
String threadName = Thread.currentThread().getName();
System.out.printf("%s ForkedChild[%d]::triggered - sleep:%d %n", threadName, id, millisSleep);
try {
Thread.sleep(millisSleep);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.printf("%s ForkedChild[%d]::complete %n", threadName, id);
return true;
}
}
public static class ResultJoiner {
private final ForkedChild[] forkedTasks;
public ResultJoiner(ForkedChild[] forkedTasks) {
this.forkedTasks = forkedTasks;
}
public ResultJoiner(int forkTaskNumber){
MyNode myNode = new MyNode();
forkedTasks = new ForkedChild[forkTaskNumber];
for (int i = 0; i < forkTaskNumber; i++) {
forkedTasks[i] = new ForkedChild(myNode, i);
}
}
@OnTrigger
public boolean complete(){
System.out.printf("%s ResultJoiner:complete %n%n", Thread.currentThread().getName());
return true;
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new ResultJoiner(5));
processor.init();
Instant start = Instant.now();
processor.onEvent("test");
System.out.printf("duration: %d milliseconds%n", Duration.between(start, Instant.now()).toMillis());
}
Output
main MyNode::handleStringEvent
ForkJoinPool.commonPool-worker-1 ForkedChild[0]::triggered - sleep:135
ForkJoinPool.commonPool-worker-2 ForkedChild[1]::triggered - sleep:85
ForkJoinPool.commonPool-worker-3 ForkedChild[2]::triggered - sleep:58
ForkJoinPool.commonPool-worker-4 ForkedChild[3]::triggered - sleep:184
ForkJoinPool.commonPool-worker-5 ForkedChild[4]::triggered - sleep:112
ForkJoinPool.commonPool-worker-3 ForkedChild[2]::complete
ForkJoinPool.commonPool-worker-2 ForkedChild[1]::complete
ForkJoinPool.commonPool-worker-5 ForkedChild[4]::complete
ForkJoinPool.commonPool-worker-1 ForkedChild[0]::complete
ForkJoinPool.commonPool-worker-4 ForkedChild[3]::complete
main ResultJoiner:complete
duration: 184 milliseconds
Batch support
Batch callbacks are supported through the BatchHandler interface that the generated EventHandler implements. Any methods
that are annotated with, @OnBatchPause
or @OnBatchEnd
will receive calls from the matching BatchHandler method.
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent(String stringToProcess) {
System.out.println("MyNode event received:" + stringToProcess);
return true;
}
@OnBatchPause
public void batchPause(){
System.out.println("MyNode::batchPause");
}
@OnBatchEnd
public void batchEnd(){
System.out.println("MyNode::batchEnd");
}
}
public static void main(String[] args) {
var processor = Fluxtion.interpret(new MyNode());
processor.init();
processor.onEvent("test");
//use BatchHandler service
BatchHandler batchHandler = (BatchHandler)processor;
batchHandler.batchPause();
batchHandler.batchEnd();
}
Output
MyNode event received:test
MyNode::batchPause
MyNode::batchEnd