Work in progress
Introduction
This example demonstrates real-time data ingestion using the Fluxtion event processing framework. The requirements for the processing are:
- Use kaggle’s Ames housing csv data set, process each row as a single event
- Subscribe to a stream of string events from the data set
- Process each string event and tests it is a valid CSV record
- log invalid input records
- For valid records
- Transform each record with a user supplied function
- Validate the transformed record
- log invalid input records
- write valid records to CSV
- write valid records to a binary format
- Record processing statistics in realtime that can be queried
- count of all records
- count of invalid csv records
- count of failed validation records
Example project
A maven project is here that processes kaggle’s Ames housing csv data set
Process flow diagram
flowchart TB
subgraph Fluxtion [Fluxtion event processor]
direction TB
csv_validator(csv field validator) -- invalid --> invalid(invalid log)
csv_validator -- valid --> x-former
x-former --> record_validator(record validator)
record_validator -- invalid --> invalid
record_validator -- valid --> csv_writer(csv writer) & binary_writer(binary writer)
record_validator & csv_validator --> stats
end
input(Csv record stream) --> csv_validator
config(x-former config) --> x-former
Solution design
This example using the Fluxtion DataFlow api to manage event subscription and notification to user supplied functions.
The actual processing logic is encapsulated in user classes and functions. The goal is to have no Fluxtion api calls in the business logic only pure vanilla java. The advantages of this approach:
- Business logic components are re-usable and testable
- Business code is not tied to a library api
- There is a clear separation between event notification and business logic
- The aot generated source file DataIngestionPipeline simplifies debugging
API
User functions
Pipeline building
The PipelineBuilder builds the processing pipeline with the behaviour:
- Valid records
- Subscribes to String events
- Tries to marshal the String from csv into a HouseRecord
- Transforms the HouseRecord by applying a user transform function
- Validates the transformed HouseRecord is valid with a user supplied java.util.function.Predicate
- Writes the valid HouseRecord to a user supplied java.io.Writer as CSV
- Writes the valid HouseRecord to a user supplied java.io.OutputStream in a binary format
- Processing stats are updated with each valid transformed HouseRecord
- Errors/invalid records
- Add an entry in the invalid log that writes to a user supplied java.io.Writer
- Processing stats are updated with each csv errors
- Processing stats are updated with each HouseRecord validation failures
The data PipelineBuilder is invoked by the Fluxtion maven plugin to generate the pipeline AOT as part of the build.
PipelineBuilder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class PipelineBuilder implements FluxtionGraphBuilder {
@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {
//flow: Csv String -> HouseInputRecord
var csv2HouseRecordFlow = DataFlow
.subscribe(String.class)
.map(new CsvToHouseRecordSerializer()::marshall);
//flow: HouseInputRecord -> x_formed(HouseInputRecord) -> validated(HouseInputRecord)
var validTransformedFlow = csv2HouseRecordFlow
.map(CsvToHouseRecordSerializer::getHouseRecord)
.map(new HouseRecordTransformer()::transform)
.map(new HouseRecordValidator()::validate);
//outputs
var csvWriter = new PostProcessCsvWriter();
var binaryWriter = new PostProcessBinaryWriter();
var stats = new ProcessingStats();
var invalidLog = new InvalidLogWriter();
//write validated output push to [stats, csv, binary]
validTransformedFlow
.map(HouseRecordValidator::getValidHouseRecord)
.push(stats::validHouseRecord, csvWriter::validHouseRecord, binaryWriter::validHouseRecord);
//invalid csv parsing output push to [invalid log, stats]
csv2HouseRecordFlow
.filter(CsvToHouseRecordSerializer::isBadCsvMessage)
.push(invalidLog::badCsvRecord, stats::badCsvRecord);
//invalid transform output push to [invalid log, stats]
validTransformedFlow
.filter(HouseRecordValidator::isInValidRecord)
.push(invalidLog::invalidHouseRecord, stats::invalidHouseRecord);
}
@Override
public void configureGeneration(FluxtionCompilerConfig compilerConfig) {
compilerConfig.setClassName("DataIngestionPipeline");
compilerConfig.setPackageName("com.fluxtion.example.cookbook.dataingestion.pipeline");
}
}
Testing
Running
The main method uses the executes a DataIngestionPipeline with data from kaggle’s AmesHousing.csv data file.
Dynamic configuration is supplied in an instance of DataIngestConfig for:
- HouseRecord validation java.util.function.Predicate
- HouseRecord validation transformer as java.util.function.UnaryOperator
- Post process Csv output - java.io.Writer
- Post process binary output - ava.io.OutputStream
- Statistics output - java.io.Writer
- Invalid log output - java.io.Writer
Executing the pipeline
Example output
A set of files are produced in the data output directory after the main has finished and the ProcessingStats are printed to console
Output file contents
- postProcessHouse.csv - Validated and transformed HouseRecord’s as a CSV file
- postProcessHouse.binary - Validated and transformed HouseRecord’s as a binary encoded file
- processStats.rpt - Processing stats record
- processingErrors.log - Csv and validation error log