0
votes

I have a program that streams cryptocurrency prices into a flink pipeline, and prints the highest bid for a time window.

Main.java

public class Main {
    private final static Logger log = LoggerFactory.getLogger(Main.class);
    private final static DateFormat dateFormat = new SimpleDateFormat("y-M-d H:m:s");
    private final static NumberFormat numberFormat = new DecimalFormat("#0.00");
    public static void main(String[] args) throws Exception {
        MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args);

        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.getConfig().setGlobalJobParameters(multipleParameterTool);
        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        streamExecutionEnvironment.addSource(new GdaxSourceFunction())
        .name("Gdax Exchange Price Source")
        .assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
            @Override
            public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new BoundedOutOfOrdernessGenerator();
            }
        })
        .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
        .trigger(EventTimeTrigger.create())
        .reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
                value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
        .addSink(new SinkFunction<TickerPrice>() {
             @Override
             public void invoke(TickerPrice value, Context context) throws Exception {
                 String dateString = dateFormat.format(context.timestamp());
                 String valueString = "$" + numberFormat.format(value.getHighestBid());
                 log.info(dateString + " : " + valueString);
             }
        }).name("Highest Bid Logger");

        streamExecutionEnvironment.execute("Gdax Highest bid window calculator");
    }

    /**
     * This generator generates watermarks assuming that elements arrive out of order,
     * but only to a certain degree. The latest elements for a certain timestamp t will arrive
     * at most n milliseconds after the earliest elements for timestamp t.
     */
    public static class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<TickerPrice> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public void onEvent(TickerPrice event, long eventTimestamp, WatermarkOutput output) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // emit the watermark as current highest timestamp minus the out-of-orderness bound
            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
        }
    }
}

GdaxSourceFunction.java

public class GdaxSourceFunction extends WebSocketClient implements SourceFunction<TickerPrice> {
    private static String URL = "wss://ws-feed.gdax.com";
    private static Logger log = LoggerFactory.getLogger(GdaxSourceFunction.class);
    private static String subscribeMsg = "{\n" +
            "    \"type\": \"subscribe\",\n" +
            "    \"product_ids\": [<productIds>],\n" +
            "    \"channels\": [\n" +
            //TODO: uncomment to re-enable order book tracking
            //"        \"level2\",\n" +
            "        {\n" +
            "            \"name\": \"ticker\",\n" +
            "            \"product_ids\": [<productIds>]\n" +
            "        }\n"+
            "    ]\n" +
            "}";
    SourceContext<TickerPrice> ctx;

    @Override
    public void run(SourceContext<TickerPrice> ctx) throws Exception {
        this.ctx = ctx;
        openConnection().get();
        while(isOpen()) {
            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {

    }


    @Override
    public void onMessage(String message) {
        try {
            ObjectNode objectNode = objectMapper.readValue(message, ObjectNode.class);
            String type = objectNode.get("type").asText();
            if("ticker".equals(type)) {
                TickerPrice tickerPrice = new TickerPrice();
                String productId = objectNode.get("product_id").asText();
                String[] currencies = productId.split("-");
                tickerPrice.setFromCurrency(currencies[1]);
                tickerPrice.setToCurrency(currencies[0]);
                tickerPrice.setHighestBid(objectNode.get("best_bid").asDouble());
                tickerPrice.setLowestOffer(objectNode.get("best_ask").asDouble());
                tickerPrice.setExchange("gdax");
                String time = objectNode.get("time").asText();
                Instant instant = Instant.parse(time);
                ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());
            }
            //log.info(objectNode.toString());
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void onOpen(Session session) {
        super.onOpen(session);

        //Authenticate and ensure we can properly connect to Gdax Websocket
        //construct auth message with list of product ids

        StringBuilder productIds = new StringBuilder("");
        productIds.append("" +
                "\"ETH-USD\",\n" +
                "\"ETH-USD\",\n" +
                "\"BTC-USD\"");

        String subMsg = subscribeMsg.replace("<productIds>", productIds.toString());

        try {
            userSession.getAsyncRemote().sendText(subMsg).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String getUrl() {
        return URL;
    }
}

but the sink function is never called. I have verified that the reducer is executing (very fast, every 100 milliseconds). If I remove the windowing part and just print the bid for every record coming in, the program works. But I've followed all the tutorials on windowing, and I see no difference between what I'm doing here and what's shown in the tutorials. I don't know why the flink sink would not execute in windowed mode.

I copied the BoundedOutOfOrdernessGenerator class directly from this tutorial. It should work for my use case. Within 3600 miliseconds, I should see my first record in the logs but I don't. I debugged the program and the sink function never executes. If I remove these lines:

.assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
            @Override
            public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new BoundedOutOfOrdernessGenerator();
            }
        })
        .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
        .trigger(EventTimeTrigger.create())
        .reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
                value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)      

so that the stream creation code looks like:

streamExecutionEnvironment.addSource(new GdaxSourceFunction())
        .name("Gdax Exchange Price Source")
        .addSink(new SinkFunction<TickerPrice>() {
             @Override
             public void invoke(TickerPrice value, Context context) throws Exception {
                 String dateString = dateFormat.format(context.timestamp());
                 String valueString = "$" + numberFormat.format(value.getHighestBid());
                 log.info(dateString + " : " + valueString);
             }
        }).name("Highest Bid Logger");

The sink executes, but of course the results aren't windowed so they're incorrect for my use case. But that shows that something is wrong with my windowing logic but I don't know what it is.

Versions:

JDK 1.8 Flink 1.11.2

1
what is the flink version?Litchy
@Litchy Flink 1.11.2, JDK 1.8Calicoder

1 Answers

2
votes

I believe the cause of this issue is that the timestamps produced by your custom source are in units of seconds, while window durations are always measured in milliseconds. Try changing

ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());

to

ctx.collectWithTimestamp(tickerPrice, instant.getEpochMilli());

I would also suggest some other (largely unrelated) changes.

streamExecutionEnvironment.addSource(new GdaxSourceFunction())
    .name("Gdax Exchange Price Source")
    .uid("source")
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<TickerPrice>forBoundedOutOfOrderness(Duration.ofMillis(3500))
    )
    .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
    .reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
            value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
    .uid("window")
    .addSink(new SinkFunction<TickerPrice>() { ... }
    .uid("sink")

Note the following recommendations:

  • Remove the BoundedOutOfOrdernessGenerator. There's no need to reimplement the built-in bounded-out-of-orderness watermark generator.
  • Remove the window trigger. There appears to be no need to override the default trigger, and if you get it wrong, it will cause problems.
  • Add UIDs to each stateful operator. These will be needed if you ever want to do stateful upgrades of your application after changing the job topology. (Your current sink isn't stateful, but adding a UID to it won't hurt.)