3
votes

My objective is to read the avro file data from Cloud storage and write it to BigQuery table using Java. It would be good if some one provide the code snipet/ideas to read avro format data and write it to BigQuery table using Cloud Dataflow.

2

2 Answers

4
votes

I see two possible approaches:

  1. Using Dataflow:
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline p = Pipeline.create(options);

    // Read an AVRO file.
    // Alternatively, read the schema from a file.
    // https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/AvroIO.html
    Schema avroSchema = new Schema.Parser().parse(
        "{\"type\": \"record\", "
            + "\"name\": \"quote\", "
            + "\"fields\": ["
            + "{\"name\": \"source\", \"type\": \"string\"},"
            + "{\"name\": \"quote\", \"type\": \"string\"}"
            + "]}");
    PCollection<GenericRecord> avroRecords = p.apply(
        AvroIO.readGenericRecords(avroSchema).from("gs://bucket/quotes.avro"));

    // Convert Avro GenericRecords to BigQuery TableRows.
    // It's probably better to use Avro-generated classes instead of manually casting types.
    // https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery
    PCollection<TableRow> bigQueryRows = avroRecords.apply(
        MapElements.into(TypeDescriptor.of(TableRow.class))
            .via(
                (GenericRecord elem) ->
                    new TableRow()
                        .set("source", ((Utf8) elem.get("source")).toString())
                        .set("quote", ((Utf8) elem.get("quote")).toString())));

    // https://cloud.google.com/bigquery/docs/schemas
    TableSchema bigQuerySchema =
        new TableSchema()
            .setFields(
                ImmutableList.of(
                    new TableFieldSchema()
                        .setName("source")
                        .setType("STRING"),
                    new TableFieldSchema()
                        .setName("quote")
                        .setType("STRING")));

    bigQueryRows.apply(BigQueryIO.writeTableRows()
        .to(new TableReference()
            .setProjectId("project_id")
            .setDatasetId("dataset_id")
            .setTableId("avro_source"))
        .withSchema(bigQuerySchema)
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    p.run().waitUntilFinish();
  1. Import data into BigQuery directly without Dataflow. See this documentation: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
2
votes

For this, you can try using the following Python script:

import apache_beam as beam
import sys

PROJECT='YOUR_PROJECT'
BUCKET='YOUR_BUCKET'

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)

   (p
      | 'ReadAvroFromGCS' >> beam.io.avroio.ReadFromAvro('gs://{0}/file.avro'.format(BUCKET))
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:dataset.avrotable'.format(PROJECT))
   )

   p.run()

if __name__ == '__main__':
   run()

Hope it helps.