Work in progress
TO BE COMPLETED
Realtime data ingestion example
This example demonstrates real-time data ingestion using the Fluxtion event processing framework. The requirements for the processing are:
- Use kaggle’s Ames housing csv data set, process each row as a single event
- Subscribe to a stream of string events from the data set
- Process each string event and tests it is a valid CSV record
- log invalid input records
- For valid records
- Transform each record with a user supplied function
- Validate the transformed record
- log invalid input records
- write valid records to CSV
- write valid records to a binary format
- Record processing statistics in realtime that can be queried
- count of all records
- count of invalid csv records
- count of failed validation records
Example project
A maven project is here that processes kaggle’s Ames housing csv data set
Process flow diagram
flowchart TB
subgraph Fluxtion [Fluxtion event processor]
direction TB
csv_validator(csv field validator) -- invalid --> invalid(invalid log)
csv_validator -- valid --> x-former
x-former --> record_validator(record validator)
record_validator -- invalid --> invalid
record_validator -- valid --> csv_writer(csv writer) & binary_writer(binary writer)
record_validator & csv_validator --> stats
end
input(Csv record stream) --> csv_validator
config(x-former config) --> x-former
csv_writer --> postProcessHouse.csv[(postProcessHouse.csv)]
stats --> processStats.rpt[(processStats.rpt)]
binary_writer --> postProcessHouse.binary[(postProcessHouse.binary)]
invalid --> processingErrors.log[(processingErrors.log)]
Solution design
This example using the Fluxtion DataFlow api to manage event subscription and notification to user supplied functions.
The actual processing logic is encapsulated in user classes and functions. The goal is to have no Fluxtion api calls in the business logic only pure vanilla java. The advantages of this approach:
- Business logic components are re-usable and testable
- Business code is not tied to a library api
- There is a clear separation between event notification and business logic
- The aot generated source file DataIngestionPipeline simplifies debugging
Pipeline building
The PipelineBuilder builds the processing pipeline with the behaviour:
- Valid records
- Subscribes to String events
- Tries to marshal the String from csv into a HouseRecord
- Transforms the HouseRecord by applying a user transform function
- Validates the transformed HouseRecord is valid with a user supplied java.util.function.Predicate
- Writes the valid HouseRecord to a user supplied java.io.Writer as CSV
- Writes the valid HouseRecord to a user supplied java.io.OutputStream in a binary format
- Processing stats are updated with each valid transformed HouseRecord
- Errors/invalid records
- Add an entry in the invalid log that writes to a user supplied java.io.Writer
- Processing stats are updated with each csv errors
- Processing stats are updated with each HouseRecord validation failures
The data PipelineBuilder is invoked by the Fluxtion maven plugin to generate the pipeline AOT as part of the build.
PipelineBuilder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class PipelineBuilder implements FluxtionGraphBuilder {
@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {
//flow: Csv String -> HouseInputRecord
var csv2HouseRecordFlow = DataFlow
.subscribe(String.class)
.map(new CsvToHouseRecordSerializer()::marshall);
//flow: HouseInputRecord -> x_formed(HouseInputRecord) -> validated(HouseInputRecord)
var validTransformedFlow = csv2HouseRecordFlow
.map(CsvToHouseRecordSerializer::getHouseRecord)
.map(new HouseRecordTransformer()::transform)
.map(new HouseRecordValidator()::validate);
//outputs
var csvWriter = new PostProcessCsvWriter();
var binaryWriter = new PostProcessBinaryWriter();
var stats = new ProcessingStats();
var invalidLog = new InvalidLogWriter();
//write validated output push to [stats, csv, binary]
validTransformedFlow
.map(HouseRecordValidator::getValidHouseRecord)
.push(stats::validHouseRecord, csvWriter::validHouseRecord, binaryWriter::validHouseRecord);
//invalid csv parsing output push to [invalid log, stats]
csv2HouseRecordFlow
.filter(CsvToHouseRecordSerializer::isBadCsvMessage)
.push(invalidLog::badCsvRecord, stats::badCsvRecord);
//invalid transform output push to [invalid log, stats]
validTransformedFlow
.filter(HouseRecordValidator::isInValidRecord)
.push(invalidLog::invalidHouseRecord, stats::invalidHouseRecord);
}
@Override
public void configureGeneration(FluxtionCompilerConfig compilerConfig) {
compilerConfig.setClassName("DataIngestionPipeline");
compilerConfig.setPackageName("com.fluxtion.example.cookbook.dataingestion.pipeline");
}
}
User functions
User functions are bound into the event processor and perform the actual business processing. The dsl binds a function into the graph, all state management and event dispatch is handled by the event processor. Developers can concentrate on writing business logic confident that all dispatch will be automatically generated.
There are a several user functions for handling and processing data in the pipeline. All the functions are simple java POJO’s, an export service annotation allows a function to receive a config callback from the outside world.
- CsvToHouseRecordSerializer - converts csv string to a HouseRecord pojo
- HouseRecordTransformer - applies a custom transformer to a HouseRecord. The transformer is supplied at runtime and is dynamic
- HouseRecordValidator - applies a custom validator to a HouseRecord. The validator is supplied at runtime and is dynamic
- InvalidLogWriter - writes error log to a custom writer. The writer is supplied at runtime and is dynamic
- PostProcessBinaryWriter - writes binary output of valid records to a custom writer. The writer is supplied at runtime and is dynamic
- PostProcessCsvWriter - writes csv output of valid records to a custom writer. The writer is supplied at runtime and is dynamic
- ProcessingStats - writes processing stats report to a custom writer. The writer is supplied at runtime and is dynamic
A sample user function is shown below, all the user functions are in this package
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class InvalidLogWriter
implements
DataIngestLifecycle,
@ExportService(propagate = false) DataIngestComponent {
private Writer logWriter;
@Override
public void init() {
logWriter = NullWriter.NULL_WRITER;
}
@Override
public boolean configUpdate(DataIngestConfig config) {
logWriter = config.getInvalidLogWriter() == null ? NullWriter.NULL_WRITER : config.getInvalidLogWriter();
return false;
}
@SneakyThrows
public void badCsvRecord(CsvToHouseRecordSerializer message) {
String inputString = message.getInputString().substring(0, message.getInputString().length() - 1);
logWriter.append("csv error " + message.getProcessingException().getMessage() + " msg[" + inputString + "]\n");
}
@SneakyThrows
public void invalidHouseRecord(HouseRecordValidator message) {
logWriter.append("validation error record[" + message.getInValidHouseRecord() + "]\n");
}
@Override
@SneakyThrows
public void tearDown() {
logWriter.flush();
}
}
API
TO BE COMPLETED
Testing
TO BE COMPLETED
Running
The main method uses the executes a DataIngestionPipeline with data from kaggle’s AmesHousing.csv data file.
Dynamic configuration is supplied in an instance of DataIngestConfig for:
- HouseRecord validation java.util.function.Predicate
- HouseRecord validation transformer as java.util.function.UnaryOperator
- Post process Csv output - java.io.Writer
- Post process binary output - ava.io.OutputStream
- Statistics output - java.io.Writer
- Invalid log output - java.io.Writer
Executing the pipeline
Example output
A set of files are produced in the data output directory after the main has finished and the ProcessingStats are printed to console
Output file contents
- postProcessHouse.csv - Validated and transformed HouseRecord’s as a CSV file
- postProcessHouse.binary - Validated and transformed HouseRecord’s as a binary encoded file
- processStats.rpt - Processing stats record
- processingErrors.log - Csv and validation error log