Fluxtion embedded stream processing


Why fluxtion explains the value proposition

Fluxtion is a Java library and code generation utility designed for building high-performance, low-latency streaming applications. It provides a lightweight framework for event-driven programming, particularly suited for applications such as financial trading systems, real-time analytics, and sensor data processing. Fluxtion emphasizes simplicity, efficiency, and ease of use in handling streaming data.

  • Streaming event processing
  • AOT compiled for fast cloud start
  • Spring integration
  • Optimised to reduce processing costs
  • Low latency nanosecond response with zero gc
  • 30 million events per second per core

Overall, Fluxtion provides a powerful framework for building real-time streaming applications in Java, offering high performance, flexibility, and ease of use for developers working with streaming data. Processing more than 30 million events per second on a single thread

Application integration


Embed a Fluxtion stream processing engine in your application, freeing your business logic from messaging vendor lock-in.

Fluxtion technology


Event-driven Programming: Fluxtion is based on the concept of event-driven programming, where components react to incoming events in real-time. Events can represent changes in data, user actions, or any other relevant triggers.

Dependency Graphs: Fluxtion models the processing logic of the application as a directed acyclic graph (DAG) of nodes, where each node represents a computation or transformation operation. Nodes can subscribe to input events, perform calculations, and publish output events.

Event Processing Pipelines: Fluxtion allows developers to define event processing pipelines, where events flow through a series of interconnected nodes. This allows for complex data transformations and aggregations to be performed efficiently.

Annotation-Based Configuration: Fluxtion provides an annotation-based configuration mechanism, allowing developers to define event processing logic using simple annotations. This simplifies the development process and reduces boilerplate code.

Code Generation: Fluxtion employs code generation techniques to optimize the performance of event processing pipelines. During compilation, Fluxtion generates highly optimized Java code tailored to the specific event processing logic defined by the developer.

Low Latency: Fluxtion is designed to achieve low latency and high throughput, making it suitable for applications where real-time responsiveness is critical, such as financial trading systems.

Support for Complex Event Processing (CEP): Fluxtion includes support for complex event processing, allowing developers to define complex event patterns and rules using a high-level DSL (Domain-Specific Language).

Instantaneous Startup: Our innovative Ahead-of-Time (AOT) compiled streaming engine minimizes startup times to near zero.

Integration with External Systems: Fluxtion can easily integrate with external systems and libraries, allowing developers to incorporate Fluxtion-based event processing logic into existing applications or frameworks.

Developer Productivity: Fluxtion has been designed to increases developer productivity when building and supporting event driven applications

Getting started


Developers

For a quick introduction to programming Fluxtion visit the hello world examples.

A developer workflow document describes the approach integrating Fluxtion into your work cycle

A series of tutorials are provided that a developer should follow to become familiar with the practical coding of Fluxtion, start with tutorial 1.

Architects

For a deeper understanding of the architecture, design and paradigms that underpin Fluxtion head over to the Fluxtion explored section.

Reference documentation

Example


This example tracks and calculates times for runners in a race. Start and finish times are received as a stream of events, when a runner finishes they receive their individual time. A call to publishAllResults will publish all current results.

The developer writes the core business logic annotating any methods that should receive event callbacks or services that are exported. Fluxtion takes care of generating all the event dispatch code that is time consuming to write, error prone and adds little value. The generated event processor is used like any normal java class in the application.

Processing graph

flowchart TB
    classDef eventHandler color:#022e1f,fill:#aaa3ff,stroke:#000;
    classDef graphNode color:#022e1f,fill:#00cfff,stroke:#000;
    classDef exportedService color:#022e1f,fill:#aaa3ff,stroke:#000;
    style EventProcessor fill:#e9ebe4,stroke:#333,stroke-width:1px

    RunnerStarted><b>InputEvent</b>::RunnerStarted]:::eventHandler 
    RunnerFinished><b>InputEvent</b>::RunnerFinished]:::eventHandler 
    ResultsPublisher([<b>ServiceLookup</b>::ResultsPublisher]):::exportedService 
    
    RaceTimeTracker[RaceTimeTracker\n<b>EventHandler</b>::RunnerStarted \n<b>EventHandler</b>::RunnerFinished]:::graphNode 
    ResultsPublisherImpl[ResultsPublisherImpl\n <b>ExportService</b>::ResultsPublisher]:::graphNode

    RunnerStarted --> RaceTimeTracker
    RunnerFinished --> RaceTimeTracker
    ResultsPublisher --> ResultsPublisherImpl
    
    subgraph EventProcessor
        RaceTimeTracker --> ResultsPublisherImpl
    end
    

Processing logic

The Fluxtion event processor manages all the event call backs, the user code handles the business logic.

  • The RaceTimeTracker is notified when a RunnerStarted event is received and records the start time for that runner
  • The runnerStarted handler method does not propagate so the event is swallowed at that point
  • The RaceTimeTracker is notified when a RunnerFinished event is received, calculates the runner race time and triggers the ResultsPublisherImpl
  • The ResultsPublisherImpl annotated trigger method gets the finisher data from RaceTimeTracker and logs the individual runner’s race time
  • When the race is over the service method ResultsPublisher.publishAllResults is called, the event processor routes this call to ResultsPublisherImpl which publishes the final results.

Three steps to using Fluxtion

1 - Mark event handling methods with annotations or via functional programming
2 - Build the event processor using fluxtion compiler utility
3 - Integrate the event processor in the app and feed it events

Custom business logic written by developer, annotated methods receive event callbacks. Here we use @OnEventHandler and @OnTrigger for events and @ExportService to export a service interface.

public class RaceCalculator {
    //streamed events
    public record RunnerStarted(long runnerId, Instant startTime) {}
    public record RunnerFinished(long runnerId, Instant finishTime) {}

    //service api
    public interface ResultsPublisher {
        void publishAllResults();
    }

    //event driven logic
    @Getter
    public static class RaceTimeTracker {
        private final transient Map<Long, RunningRecord> raceTimeMap = new HashMap<>();
        private RunningRecord latestFinisher;

        //FLUXTION ANNOTATION - lfecycle init callback
        @Initialise
        public void init(){
            raceTimeMap.clear();
        }

        //FLUXTION ANNOTATION - subscribe to RunnerStarted events, no propagation to child nodes
        @OnEventHandler(propagate = false)
        public boolean runnerStarted(RunnerStarted runnerStarted) {
            long runnerId = runnerStarted.runnerId();
            raceTimeMap.put(runnerId, new RunningRecord(runnerId, runnerStarted.startTime()));
            return false;
        }

        //FLUXTION ANNOTATION - subscribe to RunnerFinished events, propagates trigger to child nodes 
        @OnEventHandler
        public boolean runnerFinished(RunnerFinished runner) {
            latestFinisher = raceTimeMap.computeIfPresent(
                    runner.runnerId(),
                    (id, startRecord) -> new RunningRecord(id, startRecord.startTime(), runner.finishTime()));
            return true;
        }
    }

    @RequiredArgsConstructor
    public static class ResultsPublisherImpl implements 
        //FLUXTION ANNOTATION - exports the service api for this instance
        @ExportService ResultsPublisher {
        private final RaceTimeTracker raceTimeTracker;

        //FLUXTION ANNOTATION - triggered on a parent notification, from RunnerFinished event handler
        @OnTrigger
        public boolean sendIndividualRunnerResult(){
            var raceRecord = raceTimeTracker.getLatestFinisher();
            System.out.format("Crossed the line runner:%d time [%s]%n", raceRecord.runnerId(), raceRecord.runDuration());
            return false;
        }

        @Override
        public void publishAllResults() {
            System.out.println("\nFINAL RESULTS");
            raceTimeTracker.getRaceTimeMap().forEach((l, r) ->
                    System.out.println("id:" + l + " time [" + r.runDuration() + "]"));
        }
    }

    //internal record
    private record RunningRecord(Instant startTime, Instant finishTime) {
        public String runDuration() {
            Duration duration = Duration.between(startTime, finishTime);
            return duration.toHoursPart() + ":" + duration.toMinutesPart() + ":" + duration.toSecondsPart();
        }
    }
}

Bind user functions to the event processor and build. This is an interpreted example there is also an AOT project version.

var raceCalculator = Fluxtion.interpret( new ResultsPublisherImpl(new RaceTimeTracker()));

Pom.xml includes the fluxtion dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>com.fluxtion.example</groupId>
    <artifactId>reference-examples</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>reference-example :: racing</name>
    <artifactId>racing</artifactId>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.fluxtion</groupId>
            <artifactId>compiler</artifactId>
            <version>9.3.1</version>
        </dependency>
    </dependencies>
</project>

Application feeds events to an instance of the generated event processor. The event processor dispatches events to business functions

public class RaceCalculatorApp {
    public static void main(String[] args) {
        //build and initialise the raceCalculator event processor
        var raceCalculator = Fluxtion.interpret( new ResultsPublisherImpl(new RaceTimeTracker()));
        raceCalculator.init();

        //connect to event stream and process runner timing events
        raceCalculator.onEvent(new RunnerStarted(1, "2019-02-14T09:00:00Z"));
        raceCalculator.onEvent(new RunnerStarted(2, "2019-02-14T09:02:10Z"));
        raceCalculator.onEvent(new RunnerStarted(3, "2019-02-14T09:06:22Z"));

        raceCalculator.onEvent(new RunnerFinished(2, "2019-02-14T10:32:15Z"));
        raceCalculator.onEvent(new RunnerFinished(3, "2019-02-14T10:59:10Z"));
        raceCalculator.onEvent(new RunnerFinished(1, "2019-02-14T11:14:32Z"));

        //use the exported service interface ResultsPublisher to publish full results
        ResultsPublisher resultsPublisher = raceCalculator.getExportedService();
        resultsPublisher.publishAllResults();
    }
}

Output from executing the application

Crossed the line runner:2 time:1:30:5
Crossed the line runner:3 time:1:52:48
Crossed the line runner:1 time:2:14:32

FINAL RESULTS
id:1 final time:2:14:32
id:2 final time:1:30:5
id:3 final time:1:52:48

Image is generated as part of the code generator

Latest release


Open source on GitHub, artifacts published to maven central.

component maven central
Runtime Fluxtion runtime
Compiler Fluxtion compiler

Build dependencies

    <dependencies>
        <dependency>
            <groupId>com.fluxtion</groupId>
            <artifactId>runtime</artifactId>
            <version>9.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.fluxtion</groupId>
            <artifactId>compiler</artifactId>
            <version>9.3.1</version>
        </dependency>
    </dependencies>
implementation 'com.fluxtion:runtime:9.3.1'
implementation 'com.fluxtion:compiler:9.3.1'