Example project
A maven project is here
DataFlow example to calculate realtime pnl
This example demonstrates using tbe DataFlow api to calculate realtime positions and pnl from streaming trade and market data events. A DataFlow acts like a database materialized view defined with a java streams like api.
- A DataFlow is set of stateful or stateless calculations that are defined using DataFlow api
- An event processor embeds the DataFlow and routes events to the calculations
- Pass a DataFlow definition to a Fluxtion builder to embed the DataFlow in an event processor
Project requirements
Calculate the pnl of a set of assets relative to a base currency or instrument. Assets are bought and sold with trades a running total of positions is aggregated, each asset aggregate total is tracked separately. A trade comprises two assets and an asset can be on either side of trade, the order of assets is arbitrary. The aggregate sum of an asset is the dealt aggregate and dealt aggregate. The asset mark to market position(mtmPosition) is then calculated in terms of some other instrument using a conversion rate. The mtmPositions are added together to give the total pnl for all trading.
Events
incoming events that must be processed:
- Trade a pair of assets that will affect the current position
- MidPrice the conversion rate between asssets can afftect mtmPosition value
- MtmInstrument the insturment that mtmPositions are relative to
Project solution
the complete solution looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
var pnlCalculator = Fluxtion.interpret(c -> {
var tradeStream = DataFlow.subscribe(Trade.class);
var dealtPosition = tradeStream.groupBy(Trade::dealtInstrument, TradeToPosition::aggregateDealt);
var contraPosition = tradeStream.groupBy(Trade::contraInstrument, TradeToPosition::aggregateContra);
DerivedRateNode derivedRate = new DerivedRateNode();
JoinFlowBuilder.outerJoin(dealtPosition, contraPosition, InstrumentPosMtm::merge)
.publishTrigger(derivedRate)
.mapValues(derivedRate::calculateInstrumentPosMtm)
.map(new PnlSummaryCalc()::updateSummary)
.console();
}
);
pnlCalculator.init();
}
Getting data into a DataFlow
To get external data into a DataFlow we use the DataFlow.subscribe(Class<T> eventClass)
which can be pushed to from the outside
world using EventProcess.onEvent(<T> eventInstaance)
, demonstrated in the snippet below:
1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
var processor = Fluxtion.interpret(c ->{
DataFlow.subscribe(Trade.class)
.console("trade in -> {}");
});
processor.init();
processor.onEvent(new Trade(symbolEURJPY, -400, 80000));
processor.onEvent(new Trade(symbolEURUSD, 500, -1100));
processor.onEvent(new Trade(symbolUSDCHF, 500, -1100));
}
produces the following output to console
Calculating aggregate position of an asset
Once we have a DataFlow stream defined with a DataFlow.subscribe we can now perform operations on it to filter and aggregate. We
will group the trade volume by its dealt instrument and apply an aggregate function to each new item to be added to a group.
Think of the group like a table in memory whose primary key is supplied with the method reference Trade::dealtInstrument
The aggregate function is stateful, an instance is allocated to a group. A supplier of function is passed into the DataFlow
groupBy declaration with the method reference TradeToPosition::aggregateDealt
Aggregate function
The aggregate function that converts and reduces trades into a single aggregate InstrumentPosMtm. A user implements the AggregateFlowFunction interface to build an aggregate function.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TradeToPosition implements AggregateFlowFunction<Trade, InstrumentPosMtm, TradeToPosition> {
private InstrumentPosMtm instrumentPosMtm = new InstrumentPosMtm();
private final boolean dealtSide;
public TradeToPosition(boolean dealtSide) {
this.dealtSide = dealtSide;
}
public static TradeToPosition aggregateDealt() {
return new TradeToPosition(true);
}
public static TradeToPosition aggregateContra() {
return new TradeToPosition(false);
}
@Override
public InstrumentPosMtm aggregate(Trade input) {
final double previousPosition = instrumentPosMtm.getPosition();
if (dealtSide) {
instrumentPosMtm.setInstrument(input.dealtInstrument());
final double dealtPosition = input.dealtVolume();
instrumentPosMtm.setPosition(Double.isNaN(previousPosition) ? dealtPosition : dealtPosition + previousPosition);
} else {
instrumentPosMtm.setInstrument(input.contraInstrument());
final double contraPosition = input.contraVolume();
instrumentPosMtm.setPosition(Double.isNaN(previousPosition) ? contraPosition : contraPosition + previousPosition);
}
return instrumentPosMtm;
}
@Override
public InstrumentPosMtm get() {
return instrumentPosMtm;
}
@Override
public InstrumentPosMtm reset() {
instrumentPosMtm = new InstrumentPosMtm();
return instrumentPosMtm;
}
}
Aggregating DataFlow
Using the aggregate function with a groupBy is defined as follows
1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
var processor = Fluxtion.interpret(c ->{
DataFlow.subscribe(Trade.class)
.groupBy(Trade::dealtInstrument, TradeToPosition::aggregateDealt)
.console("trade dealt position by instrument -> {}");
});
processor.init();
processor.onEvent(new Trade(symbolEURJPY, -400, 80000));
processor.onEvent(new Trade(symbolEURUSD, 500, -1100));
processor.onEvent(new Trade(symbolUSDCHF, 500, -1100));
}
produces the following output to console
The running total of contra positions are calculated with
1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
var processor = Fluxtion.interpret(c ->{
DataFlow.subscribe(Trade.class)
.groupBy(Trade::contraInstrument, TradeToPosition::aggregateContra)
.console("trade contra position by instrument -> {}");
});
processor.init();
processor.onEvent(new Trade(symbolEURJPY, -400, 80000));
processor.onEvent(new Trade(symbolEURUSD, 500, -1100));
processor.onEvent(new Trade(symbolUSDCHF, 500, -1100));
}
Merging dealt and contra positions
We now have two separate group by tables that hold position data rows. We need to merge these together so we have single position for an asset. We use the outerJoin as we want to include all rows as an asset may appear on either side or both sides of the join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
var processor = Fluxtion.interpret(c -> {
var tradeStream = DataFlow.subscribe(Trade.class);
var dealtPosition = tradeStream.groupBy(Trade::dealtInstrument, TradeToPosition::aggregateDealt);
var contraPosition = tradeStream.groupBy(Trade::contraInstrument, TradeToPosition::aggregateContra);
JoinFlowBuilder.outerJoin(dealtPosition, contraPosition, InstrumentPosMtm::merge)
.console("merged trade position by instrument -> {}");
}
);
processor.init();
processor.onEvent(new Trade(symbolEURJPY, -400, 80000));
processor.onEvent(new Trade(symbolEURUSD, 500, -1100));
processor.onEvent(new Trade(symbolUSDCHF, 500, -1100));
}
produces the following output to console
Calculating mark to market
Now we have the aggregated position of an asset we want to calculate its position in terms of a mark to market instrument. We use a function on a java class DerivedNode that listens to mid rates and calculates a conversion rate between two assets. The function is applied with the mapValue call on the groupBy DataFlow
.mapValues(derivedRate::calculateInstrumentPosMtm)
Once the mtmPosition is calculated for each row in the groupBy table we can reduce the rows to a single PnlSummary using the map call:
.map(new PnlSummaryCalc()::updateSummary)
The DerivedRateNode has event handlers for MidRate and MtmInstrument and ensures the correct conversion rate is uesd in the mtm calculation whenever either of these changes.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main(String[] args) {
var pnlCalculator = Fluxtion.interpret(c -> {
var tradeStream = DataFlow.subscribe(Trade.class);
var dealtPosition = tradeStream.groupBy(Trade::dealtInstrument, TradeToPosition::aggregateDealt);
var contraPosition = tradeStream.groupBy(Trade::contraInstrument, TradeToPosition::aggregateContra);
DerivedRateNode derivedRate = new DerivedRateNode();
JoinFlowBuilder.outerJoin(dealtPosition, contraPosition, InstrumentPosMtm::merge)
.publishTrigger(derivedRate)
.mapValues(derivedRate::calculateInstrumentPosMtm)
.map(new PnlSummaryCalc()::updateSummary)
.console();
}
);
pnlCalculator.init();
pnlCalculator.onEvent(new Trade(symbolEURJPY, -400, 80000));
pnlCalculator.onEvent(new Trade(symbolEURUSD, 500, -1100));
pnlCalculator.onEvent(new Trade(symbolUSDCHF, 500, -1100));
pnlCalculator.onEvent(new Trade(symbolEURGBP, 1200, -1000));
pnlCalculator.onEvent(new Trade(symbolGBPUSD, 1500, -700));
pnlCalculator.onEvent(new MidPrice(symbolEURGBP, 0.9));
pnlCalculator.onEvent(new MidPrice(symbolEURUSD, 1.1));
pnlCalculator.onEvent(new MidPrice(symbolEURCHF, 1.2));
System.out.println("---------- final rate -----------");
pnlCalculator.onEvent(new MidPrice(symbolEURJPY, 200));
System.out.println("---------- final trade -----------");
pnlCalculator.onEvent(new Trade(symbolGBPUSD, 20, -25));
System.out.println("---------- change mtm EUR -----------");
pnlCalculator.onEvent(new MtmInstrument(EUR));
}
running the example produces the following output