Runtime library functions
The runtime environment provides several library functions that bound classes can use. This section documents the runtime
environment and how to access the library functions.
Table of contents
Processing output
An application can register for output from the EventProcessor by supplying a consumer
to addSink. Support for publishing to a sink is built into the streaming api, [builder_type]#sink
.
A consumer has a string key to partition outputs.
public static void main ( String [] args ) {
var processor = Fluxtion . interpret ( cfg ->
DataFlow . subscribeToIntSignal ( "myIntSignal" )
. mapToObj ( d -> "intValue:" + d )
. sink ( "mySink" ));
processor . init ();
processor . addSink ( "mySink" , ( Consumer < String >) System . out :: println );
processor . publishSignal ( "myIntSignal" , 10 );
processor . publishSignal ( "myIntSignal" , 256 );
}
Output
intValue:10
intValue:256
An application can remove sink using the call EventProcessor#removeSink
Clock time
Code sample
public class ClockExample {
public static class TimeLogger {
public Clock wallClock = Clock . DEFAULT_CLOCK ;
@OnEventHandler
public boolean publishTime ( DateFormat dateFormat ) {
System . out . println ( "time " + dateFormat . format ( new Date ( wallClock . getWallClockTime ())));
return true ;
}
}
public static void main ( String [] args ) throws InterruptedException {
var processor = Fluxtion . interpret ( new TimeLogger ());
processor . init ();
//PRINT CURRENT TIME
processor . onEvent ( new SimpleDateFormat ( "HH:mm:ss.SSS" ));
//SLEEP AND PRINT TIME
Thread . sleep ( 100 );
processor . onEvent ( new SimpleDateFormat ( "HH:mm:ss.SSS" ));
}
}
Sample log
time 07:33:45.744
time 07:33:45.849
TImed alarm trigger
Code sample
public class FixedRateTriggerExample {
public static class RegularTrigger {
private final FixedRateTrigger fixedRateTrigger ;
public RegularTrigger ( FixedRateTrigger fixedRateTrigger ) {
this . fixedRateTrigger = fixedRateTrigger ;
}
public RegularTrigger ( int sleepMilliseconds ) {
fixedRateTrigger = new FixedRateTrigger ( sleepMilliseconds );
}
@OnTrigger
public boolean trigger () {
System . out . println ( "RegularTrigger::triggered" );
return true ;
}
}
public static void main ( String ... args ) throws InterruptedException {
var processor = Fluxtion . interpret ( new RegularTrigger ( 100 ));
processor . init ();
//NO TRIGGER - 10MS NEEDS TO ELAPSE
processor . onEvent ( new Object ());
processor . onEvent ( "xxx" );
//WILL TRIGGER - 10MS HAS ELAPSED
Thread . sleep ( 100 );
processor . onEvent ( "xxx" );
}
}
Sample log
RegularTrigger::triggered
Buffer and trigger calculation
An event processor can buffer multiple events without causing any triggers to fire, and at some point in the future
cause all potentially dirty trigger to fire. This is known as buffering and triggering it is achieved by call
EventProcessr.bufferEvent
multiple times and then following it with a call EventProcessor.triggerCalculation
Code sample
public static class MyNode {
@OnEventHandler
public boolean handleStringEvent ( String stringToProcess ) {
System . out . println ( "MyNode event received:" + stringToProcess );
return true ;
}
}
public static class MyNode2 {
@OnEventHandler
public boolean handleIntEvent ( int intToProcess ) {
boolean propagate = intToProcess > 100 ;
System . out . println ( "MyNode2 conditional propagate:" + propagate );
return propagate ;
}
@OnEventHandler
public boolean handleStringEvent ( String stringToProcess ) {
System . out . println ( "MyNode2 event received:" + stringToProcess );
return true ;
}
}
public static class Child {
private final MyNode myNode ;
private final MyNode2 myNode2 ;
public Child ( MyNode myNode , MyNode2 myNode2 ) {
this . myNode = myNode ;
this . myNode2 = myNode2 ;
}
@OnParentUpdate
public void node1Updated ( MyNode myNode1 ){
System . out . println ( "1 - myNode updated" );
}
@OnParentUpdate
public void node2Updated ( MyNode2 myNode2 ){
System . out . println ( "2 - myNode2 updated" );
}
@OnTrigger
public boolean triggered (){
System . out . println ( "Child:triggered" );
return true ;
}
}
public static void main ( String [] args ) {
var processor = Fluxtion . interpret ( new Child ( new MyNode (), new MyNode2 ()));
processor . init ();
processor . bufferEvent ( "test" );
System . out . println ();
processor . bufferEvent ( 200 );
System . out . println ();
processor . bufferEvent ( 50 );
System . out . println ();
processor . triggerCalculation ();
}
Sample log
MyNode2 event received:test
2 - myNode2 updated
MyNode event received:test
1 - myNode updated
MyNode2 conditional propagate:true
2 - myNode2 updated
MyNode2 conditional propagate:false
Child:triggered
Audit logging
Code sample
public class AuditExample {
public static class MyAuditingNode extends EventLogNode {
@Initialise
public void init (){
auditLog . info ( "MyAuditingNode" , "init" );
auditLog . info ( "MyAuditingNode_debug" , "some debug message" );
}
@OnEventHandler
public boolean stringEvent ( String event ) {
auditLog . info ( "event" , event );
auditLog . debug ( "charCount" , event . length ());
return true ;
}
}
public static void main ( String [] args ) {
var processor = Fluxtion . interpret ( c ->{
c . addNode ( new MyAuditingNode ());
c . addEventAudit ();
});
processor . init ();
//AUDIT IS INFO BY DEFAULT
processor . onEvent ( "detailed message 1" );
}
}
Sample log
eventLogRecord:
eventTime: 1714287142943
logTime: 1714287142943
groupingId: null
event: LifecycleEvent
eventToString: Init
nodeLogs:
- myAuditingNode_0: { MyAuditingNode: init, MyAuditingNode_debug: some debug message}
endTime: 1714287142946
---
eventLogRecord:
eventTime: 1714287142946
logTime: 1714287142946
groupingId: null
event: String
eventToString: detailed message 1
nodeLogs:
- myAuditingNode_0: { event: detailed message 1}
endTime: 1714287142949
---
EventProcessorContext - context parameters
Code sample
public class ContextParamInput {
public static class ContextParamReader {
@Inject
public EventProcessorContext context ;
@Start
public void start () {
System . out . println ( "myContextParam1 -> " + context . getContextProperty ( "myContextParam1" ));
System . out . println ( "myContextParam2 -> " + context . getContextProperty ( "myContextParam2" ));
System . out . println ();
}
}
public static void main ( String [] args ) {
var processor = Fluxtion . interpret ( new ContextParamReader ());
processor . init ();
processor . addContextParameter ( "myContextParam1" , "[param1: update 1]" );
processor . start ();
processor . addContextParameter ( "myContextParam1" , "[param1: update 2]" );
processor . addContextParameter ( "myContextParam2" , "[param2: update 1]" );
processor . start ();
}
}
Sample log
myContextParam1 -> [ param1: update 1]
myContextParam2 -> null
myContextParam1 -> [ param1: update 2]
myContextParam2 -> [ param2: update 1]
DirtyStateMonitor - node dirty flag control
Code sample
public class DirtyStateMonitorExample {
public static class TriggeredChild implements NamedNode {
@Inject
public DirtyStateMonitor dirtyStateMonitor ;
private final FlowSupplier < Integer > intDataFlow ;
public TriggeredChild ( FlowSupplier < Integer > intDataFlow ) {
this . intDataFlow = intDataFlow ;
}
@OnTrigger
public boolean triggeredChild () {
System . out . println ( "TriggeredChild -> " + intDataFlow . get ());
return true ;
}
public void printDirtyStat () {
System . out . println ( "\nintDataFlow dirtyState:" + dirtyStateMonitor . isDirty ( intDataFlow ));
}
public void markDirty () {
dirtyStateMonitor . markDirty ( intDataFlow );
System . out . println ( "\nmark dirty intDataFlow dirtyState:" + dirtyStateMonitor . isDirty ( intDataFlow ));
}
@Override
public String getName () {
return "triggeredChild" ;
}
}
public static void main ( String [] args ) throws NoSuchFieldException {
var processor = Fluxtion . interpret ( new TriggeredChild ( DataFlow . subscribe ( Integer . class ). flowSupplier ()));
processor . init ();
TriggeredChild triggeredChild = processor . getNodeById ( "triggeredChild" );
processor . onEvent ( 2 );
processor . onEvent ( 4 );
//NOTHING HAPPENS
triggeredChild . printDirtyStat ();
processor . triggerCalculation ();
//MARK DIRTY
triggeredChild . markDirty ();
processor . triggerCalculation ();
}
}
Sample log
TriggeredChild -> 2
TriggeredChild -> 4
intDataFlow dirtyState:false
mark dirty intDataFlow dirtyState:true
TriggeredChild -> 4
EventDispatcher - event re-dispatch
Code sample
public class CallBackExample {
public static class MyCallbackNode {
@Inject
public EventDispatcher eventDispatcher ;
@OnEventHandler
public boolean processString ( String event ) {
for ( String item : event . split ( "," )) {
eventDispatcher . processAsNewEventCycle ( Integer . parseInt ( item ));
}
return false ;
}
@OnEventHandler
public boolean processInteger ( Integer event ) {
System . out . println ( "received event: " + event );
return false ;
}
}
public static void main ( String [] args ) {
var processor = Fluxtion . interpret ( new MyCallbackNode ());
processor . init ();
processor . onEvent ( "20,45,89" );
}
}
Sample log
received event: 20
received event: 45
received event: 89