I have a apache beam pipeline in java that looks like this:
pipeline
.apply("Read pubsub",PubsubIO.readStrings()
.fromTopic(inputTopic)
)
.apply(window)
.apply(ParDo.of(new OrderCodeKey()))
.apply(GroupByKey.<String, RawBsonDocument>create())
.apply(ParDo.of(new GetLastOrder()))
.apply(ParDo.of(new ShopIDKey()))
.apply(GroupByKey.create())
.apply(ParDo.of(new CountShopOrder()))
;
Problem description: I need to stream events of orders from a pubsub topic, each event is of type "update" or "insert" and corresponding to an update/insert operation in mongodb for an order. In each event there are some notable fields:
- clusterTime -> the time at which the operation happens
- order_code -> order's code
- shop_id -> the shop which the order belongs to.
My goal is to compute the total orders per shop.
Detail of pipeline:
- Read from pubsub topic
- Create key-value pair (for subsequent GroupByKey) key is order code and value is the document
public static class OrderCodeKey extends DoFn<String, KV<String, RawBsonDocument>>{
@ProcessElement
public void processElement (@Element String order, OutputReceiver<KV<String, RawBsonDocument>> out) throws ParseException {
// Parse JSON
RawBsonDocument document = RawBsonDocument.parse(order);
BsonDocument fullDocument = document.getDocument("fullDocument");
String orderCode = fullDocument.getString("order_code").getValue();
//Integer orderCode = 0;
out.output(KV.of(orderCode, document));
}
}
- GroupByKey (group by order_code)
- Combine events with same order_code to get the latest order (the one with the biggest clusterTime)
public static class GetLastOrder extends DoFn<KV<String, Iterable<RawBsonDocument>>, KV<String, BsonDocument>>{
@ProcessElement
public void processElement (@Element KV<String, Iterable<RawBsonDocument>> orders, OutputReceiver<KV<String, BsonDocument>> out) throws ParseException, java.text.ParseException {
List<RawBsonDocument> docs = Lists.newArrayList(orders.getValue());
Collections.sort(docs, new Comparator<RawBsonDocument>(){
@Override
public int compare(RawBsonDocument o1, RawBsonDocument o2) {
BsonTimestamp t1 = o1.get("clusterTime").asTimestamp();
BsonTimestamp t2 = o2.get("clusterTime").asTimestamp();
return t2.compareTo(t1); // sort desc
}
});
RawBsonDocument document = docs.iterator().next();
out.output(KV.of(orders.getKey(), document.getDocument("fullDocument")));
}
- Create new key (key= shop_id) for subsequent GroupByKey
public static class ShopIDKey extends DoFn<KV<String, BsonDocument>, KV<Long, BsonDocument>>{
@ProcessElement
public void processElement (@Element KV<String, BsonDocument> order, OutputReceiver<KV<Long, BsonDocument>> out) throws ParseException {
Long shopId = Long.valueOf(order.getValue().getInt32("shop_id").getValue());
out.output(KV.of(shopId, order.getValue()));
}
}
- GroupByKey (group by shop_id)
- Count number of orders per shop
public static class CountShopOrder extends DoFn<KV<Long, Iterable<BsonDocument>>, Shop>{
@ProcessElement
public void processElement (@Element KV<Long, Iterable<BsonDocument>> orders, OutputReceiver<Shop> out) throws ParseException, java.text.ParseException {
List<BsonDocument> docs = Lists.newArrayList(orders.getValue());
Long countOrder = Long.valueOf(docs.size());
}
}
In the last step, I assume that the input Iterable orders will contain only unique orders (because after the GroupByKey-GetLastOrder, only the latest event is kept). However, When I debug I received events with the same order_code - which I have already reduced by GetLastOrder ParDo. Here is the window and trigger I use:
Window<String> window = Window.<String>into(
FixedWindows.of(Duration.standardMinutes(60))
)
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
)
.withAllowedLateness(Duration.standardSeconds(0))
.accumulatingFiredPanes();
I would appreciate any comment/help on this. Thank you in advance!