1
votes

We are creating a data pipeline in GCP and facing some issue during testing. Our current architecture is on AWS, to test we are pushing one copy of data to pubsub from Lambda realtime.

  • Facing latency issue from pubsub to BigQuery and storage via dataflow (Is there a way to do bulk load as per table instead of inserting one event at a time) We have a windowing of 5 min and after 5min we group data by event key for storage purpose and write all event in that duration in single file can we do something similar in BigQuery and define schema only once for one event type instead of all event.
  • Auto Scale of worker is not happening min 2 and max 10 is given
  • All services used are in asia-northeast1
  • We receive generally 3million records per day what would be the best server config for dataflow.

    package purplle.datapipeline;
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    import java.net.SocketTimeoutException;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.TextIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.GroupByKey;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SimpleFunction;
    import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
    import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
    import org.apache.beam.sdk.transforms.windowing.Repeatedly;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.ValueInSingleWindow;
    import org.joda.time.Duration;
    import org.json.JSONException;
    import org.json.JSONObject;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.api.services.bigquery.Bigquery;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.BlobId;
    import com.google.cloud.storage.BlobInfo;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    
    import purplle.datapipeline.buisness.EventSchemaBuilder;
    import purplle.datapipeline.buisness.Ordering;
    import purplle.datapipeline.common.Constants;
    import purplle.datapipeline.helpers.Event_ordering;
    import purplle.datapipeline.helpers.Event_schema;
    import purplle.datapipeline.helpers.JSON_helper;
    
    public class StarterPipeline {
    
    
    public interface StarterPipelineOption extends PipelineOptions {
    
        /**
         * Set this required option to specify where to read the input.
         */
        @Description("Path of the file to read from")
        @Default.String(Constants.pubsub_event_pipeline_url)
        String getInputFile();
    
        void setInputFile(String value);
    
    }
    
    @SuppressWarnings("serial")
    static class ParseJsonData_storage extends DoFn<String, KV<String, String>> {
    
        @ProcessElement
        public void processElement(ProcessContext c) throws JSONException {
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
    
            if (c.element().length() > 0 && JSON_helper.isJSONValid(c.element())) {
                JSONObject event_obj = new JSONObject(c.element());
                if (event_obj.length() > 0 && event_obj.has("event")) {
                    JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj);
                    if (ob2.length() > 0 && ob2.has("event")) {
                        // Reorder the json object then pass to create pipe saperated string.
                        KV<String, String> event_kv_pair = Event_ordering.order_event_columns(ob2, "storage");
                        if (!event_kv_pair.getKey().isEmpty() && event_kv_pair.getKey().length() > 0) {
                            c.output(event_kv_pair);
                        } else {
                            log = LoggerFactory.getLogger(StarterPipeline.class);
                            log.error("Storage string empty = " + c.element());
                        }
                    } else {
                        log = LoggerFactory.getLogger(StarterPipeline.class);
                        log.error("Storage object error = " + c.element());
                    }
                } else {
                    log = LoggerFactory.getLogger(StarterPipeline.class);
                    log.error("Storage object error = " + c.element());
                }
            } else {
                log = LoggerFactory.getLogger(StarterPipeline.class);
                log.error("Storage empty element = " + c.element());
            }
        }
    }
    
    @SuppressWarnings("serial")
    static class ParseJsonData_bigquery extends DoFn<String, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) throws JSONException {
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Event json = " + c.element());
            if (!c.element().isEmpty() && JSON_helper.isJSONValid(c.element())) {
                JSONObject event_obj = new JSONObject(c.element());
                if (event_obj.length() > 0 && event_obj.has("event")) {
                    JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj);
                    if (ob2.length() > 0 && ob2.has("event")) {
                        TableRow event_row = EventSchemaBuilder.get_event_row(ob2, "bigquery");
                        if (!event_row.isEmpty()) {
                            c.output(event_row);
                        } else {
                            log = LoggerFactory.getLogger(StarterPipeline.class);
                            log.error("Bigquery set event ordering schema error = " + c.element());
                        }
                    } else {
                        log = LoggerFactory.getLogger(StarterPipeline.class);
                        log.error("Bigquery set event ordering object error = " + c.element());
                    }
                } else {
                    log = LoggerFactory.getLogger(StarterPipeline.class);
                    log.error("Bigquery event item object error = " + c.element());
                }
            } else {
                log = LoggerFactory.getLogger(StarterPipeline.class);
                log.error("Bigquery event item error = " + c.element());
            }
        }
    }
    
    @SuppressWarnings("serial")
    static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> {
        @ProcessElement
        public void processElement(ProcessContext c) throws JSONException {
    
            String event_string = c.element().getValue();
            String event_name = c.element().getKey();
    
            LocalDateTime now = LocalDateTime.now(ZoneId.of("Asia/Kolkata"));
            int year = now.getYear();
            int month = now.getMonthValue();
            int day = now.getDayOfMonth();
            int hour = now.getHour();
            int minute = now.getMinute();
            int second = now.getSecond();
    
            String storage_file_path = event_name + "/" + year + "/" + month + "/" + day + "/" + hour + "/" + event_name
            + "-" + year + "-" + month + "-" + day + "-" + hour + "-" + minute + "-" + second + ".txt";
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Writing file to location = " + storage_file_path);
    
            // Create your service object
            Storage storage = StorageOptions.getDefaultInstance().getService();
    
            // Upload a blob to the newly created bucket
            BlobId blobId = BlobId.of(Constants.gcp_events_bucket_name, storage_file_path);
            BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
            @SuppressWarnings("unused")
            Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8));
    
        }
    }
    
    @SuppressWarnings("serial")
    public static class ReadEventJson_storage extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
        @Override
        public PCollection<KV<String, String>> expand(PCollection<String> lines) {
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Storage workflow started");
    
            @SuppressWarnings("unused")
            Boolean tempbool = Event_ordering.setEventsOrdering();
            // Convert lines of text into individual words.
            PCollection<KV<String, String>> words = lines.apply(ParDo.of(new ParseJsonData_storage()));
    
            return words;
        }
    }
    
    @SuppressWarnings("serial")
    public static class ReadEventJson_bigquery extends PTransform<PCollection<String>, PCollection<TableRow>> {
        @Override
        public PCollection<TableRow> expand(PCollection<String> lines) {
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Bigquery workflow started");
    
            @SuppressWarnings("unused")
            Boolean tempbool = Event_ordering.setEventsOrdering();
    
            log.info("Bigquery get event ordering");
            Ordering events_ordering = Event_ordering.getEventsOrdering();
    
            Event_schema es = new Event_schema();
            es.setEventSchema(events_ordering);
    
            // Convert lines of text into individual words.
            PCollection<TableRow> table_row = lines.apply(ParDo.of(new ParseJsonData_bigquery()));
    
            log.info("Bigquery workflow rows prepared");
    
            return table_row;
        }
    }
    
    /** A SimpleFunction that converts a Word and Count into a printable string. */
    @SuppressWarnings("serial")
    public static class CombineEventStrings extends SimpleFunction<KV<String, Iterable<String>>, KV<String, String>> {
    
        @Override
        public KV<String, String> apply(KV<String, Iterable<String>> input) {
    
            String combined_event = "";
    
            for (String combined_str : input.getValue()) {
                combined_event += combined_str + "\n";
            }
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("combined_event = " + combined_event);
    
            KV<String, String> return_kv = KV.of(input.getKey(), combined_event);
    
            return return_kv;
        }
    }
    
    @SuppressWarnings("serial")
    public static void main(String[] args) throws SocketTimeoutException {
    
        Logger log = LoggerFactory.getLogger(StarterPipeline.class);
    
        log.info("Events pipeline job started");
    
        StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
        .as(StarterPipelineOption.class);
    
        Pipeline p = Pipeline.create(options);
    
        log.info("Pipeline created");
    
        log.info("Pipeline Started");
    
        PCollection<String> datastream = p.apply("Read Events From Pubsub",
            PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));
    
        // PCollection<String> windowed_items =
        // datastream.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
    
        // PCollection<String> windowed_items = datastream.apply(
        // Window.<String>into(SlidingWindows.of(Duration.standardMinutes(1)).every(Duration.standardSeconds(10))));
    
        PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
            .triggering(Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
            .withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());
    
        // Write to storage
        windowed_items.apply("Read and make pipe separated event string", new ReadEventJson_storage())
        .apply("Combine events by keys", GroupByKey.<String, String>create())
        .apply("Combine events strings by event name", MapElements.via(new CombineEventStrings()))
        .apply("Manually write events to GCS", ParDo.of(new Write_to_GCS()));
    
        // Write into Big Query
        windowed_items.apply("Read and make event table row", new ReadEventJson_bigquery())
    
        .apply("Write_events_to_BQ",
            BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
                public String getDestination(ValueInSingleWindow<TableRow> element) {
                    String destination = EventSchemaBuilder
                    .fetch_destination_based_on_event(element.getValue().get("event").toString());
                    return destination;
                }
    
                @Override
                public TableDestination getTable(String table) {
                    String destination = EventSchemaBuilder.fetch_table_name_based_on_event(table);
                    return new TableDestination(destination, destination);
                }
    
                @Override
                public TableSchema getSchema(String table) {
                    TableSchema table_schema = EventSchemaBuilder.fetch_table_schema_based_on_event(table);
                    return table_schema;
                }
            }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
            );
    
        p.run().waitUntilFinish();
    
        log.info("Events Pipeline Job Stopped");
    
    }
    
    }
    

Image : Dataflow Progress 1 | Dataflow Progress 2 | Dataflow Job Description

1

1 Answers

3
votes

Check this post out:

  • https://medium.com/teads-engineering/give-meaning-to-100-billion-analytics-events-a-day-d6ba09aa8f44

  • They are handling 100 billion events a day with Dataflow.

  • Instead of streaming, they opted for batch. Note that they chose a hard way to batch, currently Dataflow has a way easier and faster path.
  • Their described latency "oscillates between 3 min (minimum duration of the Write BQ phase) and 30 min".
  • This latency could be way shorter if they moved to the new Dataflow "easy" batch to BigQuery mode.

(the connector deserves a deeper appreciation post, but in the meantime check this slide out https://twitter.com/felipehoffa/status/1000024539944902656)