0
votes

I have a apache beam pipeline in java that looks like this:

        pipeline
        .apply("Read pubsub",PubsubIO.readStrings()
                .fromTopic(inputTopic)
        )
        .apply(window)
        .apply(ParDo.of(new OrderCodeKey()))
        .apply(GroupByKey.<String, RawBsonDocument>create())
        .apply(ParDo.of(new GetLastOrder()))
        .apply(ParDo.of(new ShopIDKey()))
        .apply(GroupByKey.create())
        .apply(ParDo.of(new CountShopOrder()))
        ;

Problem description: I need to stream events of orders from a pubsub topic, each event is of type "update" or "insert" and corresponding to an update/insert operation in mongodb for an order. In each event there are some notable fields:

  • clusterTime -> the time at which the operation happens
  • order_code -> order's code
  • shop_id -> the shop which the order belongs to.

My goal is to compute the total orders per shop.

Detail of pipeline:

  • Read from pubsub topic
  • Create key-value pair (for subsequent GroupByKey) key is order code and value is the document
  public static class OrderCodeKey extends DoFn<String, KV<String, RawBsonDocument>>{
       @ProcessElement
       public void processElement (@Element String order, OutputReceiver<KV<String, RawBsonDocument>> out) throws ParseException {
           // Parse JSON
           RawBsonDocument document = RawBsonDocument.parse(order);
           BsonDocument fullDocument = document.getDocument("fullDocument");
           String orderCode = fullDocument.getString("order_code").getValue();
           //Integer orderCode = 0;
           out.output(KV.of(orderCode, document));
       }
   }
  • GroupByKey (group by order_code)
  • Combine events with same order_code to get the latest order (the one with the biggest clusterTime)
public static class GetLastOrder extends DoFn<KV<String, Iterable<RawBsonDocument>>, KV<String, BsonDocument>>{
        @ProcessElement
        public void processElement (@Element KV<String, Iterable<RawBsonDocument>> orders, OutputReceiver<KV<String, BsonDocument>> out) throws ParseException, java.text.ParseException {
            List<RawBsonDocument> docs = Lists.newArrayList(orders.getValue());
            Collections.sort(docs, new Comparator<RawBsonDocument>(){
                @Override
                public int compare(RawBsonDocument o1, RawBsonDocument o2) {
                    BsonTimestamp t1 = o1.get("clusterTime").asTimestamp();
                    BsonTimestamp t2 = o2.get("clusterTime").asTimestamp();
                    return t2.compareTo(t1); // sort desc
                }
            });
            RawBsonDocument document = docs.iterator().next();
            out.output(KV.of(orders.getKey(), document.getDocument("fullDocument")));
        }
  • Create new key (key= shop_id) for subsequent GroupByKey
public static class ShopIDKey extends DoFn<KV<String, BsonDocument>, KV<Long, BsonDocument>>{
        @ProcessElement
        public void processElement (@Element KV<String, BsonDocument> order, OutputReceiver<KV<Long, BsonDocument>> out) throws ParseException {
        Long shopId =  Long.valueOf(order.getValue().getInt32("shop_id").getValue());
        out.output(KV.of(shopId, order.getValue()));
        }
    }
  • GroupByKey (group by shop_id)
  • Count number of orders per shop
public static class CountShopOrder extends DoFn<KV<Long, Iterable<BsonDocument>>, Shop>{
        @ProcessElement
        public void processElement (@Element KV<Long, Iterable<BsonDocument>> orders, OutputReceiver<Shop> out) throws ParseException, java.text.ParseException {
            List<BsonDocument> docs = Lists.newArrayList(orders.getValue());
            Long countOrder = Long.valueOf(docs.size());
        }
    }

In the last step, I assume that the input Iterable orders will contain only unique orders (because after the GroupByKey-GetLastOrder, only the latest event is kept). However, When I debug I received events with the same order_code - which I have already reduced by GetLastOrder ParDo. Here is the window and trigger I use:

Window<String> window = Window.<String>into(
                FixedWindows.of(Duration.standardMinutes(60))
                )
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                )
                .withAllowedLateness(Duration.standardSeconds(0))
                .accumulatingFiredPanes();

I would appreciate any comment/help on this. Thank you in advance!

1

1 Answers

2
votes

If you don't need intermediate results but just want hourly summary of unique orders per shop, early triggering is unnecessary.

When you use .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()), each pane will process only a subset of data after the first data is arrived in the pane. With .accumulatingFiredPanes(), you will see the accumulated result from the previous pane, so the same data appear multiple times across the different panes for the same window.

Please see https://beam.apache.org/documentation/programming-guide/#setting-a-trigger for the details.