We are creating a data pipeline in GCP and facing some issue during testing. Our current architecture is on AWS, to test we are pushing one copy of data to pubsub from Lambda realtime.
- Facing latency issue from pubsub to BigQuery and storage via dataflow (Is there a way to do bulk load as per table instead of inserting one event at a time) We have a windowing of 5 min and after 5min we group data by event key for storage purpose and write all event in that duration in single file can we do something similar in BigQuery and define schema only once for one event type instead of all event.
- Auto Scale of worker is not happening min 2 and max 10 is given
- All services used are in asia-northeast1
We receive generally 3million records per day what would be the best server config for dataflow.
package purplle.datapipeline; import static java.nio.charset.StandardCharsets.UTF_8; import java.net.SocketTimeoutException; import java.time.LocalDateTime; import java.time.ZoneId; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import purplle.datapipeline.buisness.EventSchemaBuilder; import purplle.datapipeline.buisness.Ordering; import purplle.datapipeline.common.Constants; import purplle.datapipeline.helpers.Event_ordering; import purplle.datapipeline.helpers.Event_schema; import purplle.datapipeline.helpers.JSON_helper; public class StarterPipeline { public interface StarterPipelineOption extends PipelineOptions { /** * Set this required option to specify where to read the input. */ @Description("Path of the file to read from") @Default.String(Constants.pubsub_event_pipeline_url) String getInputFile(); void setInputFile(String value); } @SuppressWarnings("serial") static class ParseJsonData_storage extends DoFn<String, KV<String, String>> { @ProcessElement public void processElement(ProcessContext c) throws JSONException { Logger log = LoggerFactory.getLogger(StarterPipeline.class); if (c.element().length() > 0 && JSON_helper.isJSONValid(c.element())) { JSONObject event_obj = new JSONObject(c.element()); if (event_obj.length() > 0 && event_obj.has("event")) { JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj); if (ob2.length() > 0 && ob2.has("event")) { // Reorder the json object then pass to create pipe saperated string. KV<String, String> event_kv_pair = Event_ordering.order_event_columns(ob2, "storage"); if (!event_kv_pair.getKey().isEmpty() && event_kv_pair.getKey().length() > 0) { c.output(event_kv_pair); } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Storage string empty = " + c.element()); } } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Storage object error = " + c.element()); } } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Storage object error = " + c.element()); } } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Storage empty element = " + c.element()); } } } @SuppressWarnings("serial") static class ParseJsonData_bigquery extends DoFn<String, TableRow> { @ProcessElement public void processElement(ProcessContext c) throws JSONException { Logger log = LoggerFactory.getLogger(StarterPipeline.class); log.info("Event json = " + c.element()); if (!c.element().isEmpty() && JSON_helper.isJSONValid(c.element())) { JSONObject event_obj = new JSONObject(c.element()); if (event_obj.length() > 0 && event_obj.has("event")) { JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj); if (ob2.length() > 0 && ob2.has("event")) { TableRow event_row = EventSchemaBuilder.get_event_row(ob2, "bigquery"); if (!event_row.isEmpty()) { c.output(event_row); } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Bigquery set event ordering schema error = " + c.element()); } } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Bigquery set event ordering object error = " + c.element()); } } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Bigquery event item object error = " + c.element()); } } else { log = LoggerFactory.getLogger(StarterPipeline.class); log.error("Bigquery event item error = " + c.element()); } } } @SuppressWarnings("serial") static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> { @ProcessElement public void processElement(ProcessContext c) throws JSONException { String event_string = c.element().getValue(); String event_name = c.element().getKey(); LocalDateTime now = LocalDateTime.now(ZoneId.of("Asia/Kolkata")); int year = now.getYear(); int month = now.getMonthValue(); int day = now.getDayOfMonth(); int hour = now.getHour(); int minute = now.getMinute(); int second = now.getSecond(); String storage_file_path = event_name + "/" + year + "/" + month + "/" + day + "/" + hour + "/" + event_name + "-" + year + "-" + month + "-" + day + "-" + hour + "-" + minute + "-" + second + ".txt"; Logger log = LoggerFactory.getLogger(StarterPipeline.class); log.info("Writing file to location = " + storage_file_path); // Create your service object Storage storage = StorageOptions.getDefaultInstance().getService(); // Upload a blob to the newly created bucket BlobId blobId = BlobId.of(Constants.gcp_events_bucket_name, storage_file_path); BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build(); @SuppressWarnings("unused") Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8)); } } @SuppressWarnings("serial") public static class ReadEventJson_storage extends PTransform<PCollection<String>, PCollection<KV<String, String>>> { @Override public PCollection<KV<String, String>> expand(PCollection<String> lines) { Logger log = LoggerFactory.getLogger(StarterPipeline.class); log.info("Storage workflow started"); @SuppressWarnings("unused") Boolean tempbool = Event_ordering.setEventsOrdering(); // Convert lines of text into individual words. PCollection<KV<String, String>> words = lines.apply(ParDo.of(new ParseJsonData_storage())); return words; } } @SuppressWarnings("serial") public static class ReadEventJson_bigquery extends PTransform<PCollection<String>, PCollection<TableRow>> { @Override public PCollection<TableRow> expand(PCollection<String> lines) { Logger log = LoggerFactory.getLogger(StarterPipeline.class); log.info("Bigquery workflow started"); @SuppressWarnings("unused") Boolean tempbool = Event_ordering.setEventsOrdering(); log.info("Bigquery get event ordering"); Ordering events_ordering = Event_ordering.getEventsOrdering(); Event_schema es = new Event_schema(); es.setEventSchema(events_ordering); // Convert lines of text into individual words. PCollection<TableRow> table_row = lines.apply(ParDo.of(new ParseJsonData_bigquery())); log.info("Bigquery workflow rows prepared"); return table_row; } } /** A SimpleFunction that converts a Word and Count into a printable string. */ @SuppressWarnings("serial") public static class CombineEventStrings extends SimpleFunction<KV<String, Iterable<String>>, KV<String, String>> { @Override public KV<String, String> apply(KV<String, Iterable<String>> input) { String combined_event = ""; for (String combined_str : input.getValue()) { combined_event += combined_str + "\n"; } Logger log = LoggerFactory.getLogger(StarterPipeline.class); log.info("combined_event = " + combined_event); KV<String, String> return_kv = KV.of(input.getKey(), combined_event); return return_kv; } } @SuppressWarnings("serial") public static void main(String[] args) throws SocketTimeoutException { Logger log = LoggerFactory.getLogger(StarterPipeline.class); log.info("Events pipeline job started"); StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(StarterPipelineOption.class); Pipeline p = Pipeline.create(options); log.info("Pipeline created"); log.info("Pipeline Started"); PCollection<String> datastream = p.apply("Read Events From Pubsub", PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url)); // PCollection<String> windowed_items = // datastream.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); // PCollection<String> windowed_items = datastream.apply( // Window.<String>into(SlidingWindows.of(Duration.standardMinutes(1)).every(Duration.standardSeconds(10)))); PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows()) .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300)))) .withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes()); // Write to storage windowed_items.apply("Read and make pipe separated event string", new ReadEventJson_storage()) .apply("Combine events by keys", GroupByKey.<String, String>create()) .apply("Combine events strings by event name", MapElements.via(new CombineEventStrings())) .apply("Manually write events to GCS", ParDo.of(new Write_to_GCS())); // Write into Big Query windowed_items.apply("Read and make event table row", new ReadEventJson_bigquery()) .apply("Write_events_to_BQ", BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() { public String getDestination(ValueInSingleWindow<TableRow> element) { String destination = EventSchemaBuilder .fetch_destination_based_on_event(element.getValue().get("event").toString()); return destination; } @Override public TableDestination getTable(String table) { String destination = EventSchemaBuilder.fetch_table_name_based_on_event(table); return new TableDestination(destination, destination); } @Override public TableSchema getSchema(String table) { TableSchema table_schema = EventSchemaBuilder.fetch_table_schema_based_on_event(table); return table_schema; } }).withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) ); p.run().waitUntilFinish(); log.info("Events Pipeline Job Stopped"); } }
Image : Dataflow Progress 1 | Dataflow Progress 2 | Dataflow Job Description