2
votes

I can stream inserts directly into BigQuery at a speed of about 10,000 inserts per second but when I try to insert using Dataflow the 'ToBqRow' step (given below) is EXTREMELY slow. Barely 50 rows per 10 minutes and this is with 4 Workers. Any idea why? Here's the relevant code:

PCollection<Status> statuses = p
        .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
        .apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
    @ProcessElement
    public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
            String rowJson = c.element();

            try {
                TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
                Status status = TwitterObjectFactory.createStatus(rowJson);
                if (status == null) {
                    TweetsWriter.LOGGER.error("Status is null");
                } else {
                    TweetsWriter.LOGGER.debug("Status value: " + status.getText());
                }
                c.output(status);
                TweetsWriter.LOGGER.debug("Status: " + status.getId());
            } catch (Exception var4) {
                TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
            }

    }
}));

statuses
        .apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = new TableRow();
                Status status = c.element();
                row.set("Id", status.getId());
                row.set("Text", status.getText());
                row.set("RetweetCount", status.getRetweetCount());
                row.set("FavoriteCount", status.getFavoriteCount());
                row.set("Language", status.getLang());
                row.set("ReceivedAt", (Object)null);
                row.set("UserId", status.getUser().getId());
                row.set("CountryCode", status.getPlace().getCountryCode());
                row.set("Country", status.getPlace().getCountry());
                c.output(row);
        }
    }))
        .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
                .withSchema(schema)
                .withMethod(Method.STREAMING_INSERTS)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

p.run();
1
Do you perform any computationally-heavy operations on your statuses? Maybe you've fallen into Beam graph fusion optimization (cloud.google.com/dataflow/service/…) and your multiple transforms are squashed into single transform which might cause bottleneck. Try doing reshuffle before ToBQRow. - Marcin Zablocki
I have updated code above. As you can see, I don't do any computationally heavy operations. Just read a message from a PubSub topic, extract relevant info, create a TableRow object & write it. The 'ToBQRow' is the real culprit it seems: Input Collections -> Elements added -> 13,829. Output Collections -> Elements Added -> 249. - DilTeam
I don't see any kind of windowing, that might be an issue - Marcin Zablocki
Don't understand why I've to use a Window! I am not aggregating dataI In any case, tried this but it didn't help: (Not sure if this is correct usage!): .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic)) .apply("TimeWindow", Window.into(SlidingWindows.of(averagingInterval).every(averagingInterval))) - DilTeam
I've read the docs for Status without much success on understanding what happens underneath BUT I think the issue is with the TableRow object. Can you validate that TableRow is not being populated with Null data? Second, can you validate that the schema is matching with the TableRow? If they are not, that would explain why only partial rows are being mapped since the schema only matches with certain rows (i.e. when the extras are null). If you can validate that those are not the issues, I'll try to keep digging - Haris Nadeem

1 Answers

2
votes

Turns out Bigquery under Dataflow is NOT slow. Problem was, 'status.getPlace().getCountryCode()' was returning NULL so it was throwing NullPointerException that I couldn't see anywhere in the log! Clearly, Dataflow logging needs to improve. It's running really well now. As soon as message comes in the topic, almost instantaneously it gets written to BigQuery!