1
votes

Currently we are searching for the best way how we can convert raw data into common structure for further analysis. Our data is JSON files, some files has more fields, some less, some might have arrays, but in general it is pretty the same structure.

I'm trying to build Apache Beam pipeline in Java for this purpose. All my pipelines are based on this template: TextIOToBigQuery.java

First approach is to load entire JSON as string into one column and then use JSON Functions in Standard SQL to transform into common structure. This is well described here: How to manage/handle schema changes while loading JSON file into BigQuery table

Second approach is to load data into appropriate columns. So now data can be queried via standard SQL. It also requires to know schema. It is possible to detect it via console, UI and other: Using schema auto-detection, however I didn't find anything about how this can be achieved via Java and Apache Beam pipeline.

I analyzed BigQueryIO and looks like it cannot work without schema (with one exception, if table already created)

As I mentioned before, new files might bring new fields, so schema should be updated accordingly.

Let's say I have three JSON files:

1. { "field1": "value1" }
2. { "field2": "value2" }
3. { "field1": "value3", "field10": "value10" }

First one creates new table with one field "field1" of type string. So my table should look like this:

|field1  |
----------
|"value1"|

Second does the same, but add new field "field2". And now my table should look like this:

|field1  |field2  |
-------------------
|"value1"|null    |
-------------------
|null    |"value2"|

Third JSON should add another field "field10" into schema and so on. Real JSON file might have 200 fields or more. How hard it will be to handle such scenario?

Which way is better to do this transformation?

2
Is this intended to remain as a Batch processes from TextIO or do you also wish to do this in stream mode in the future.Reza Rokni
@RezaRokni I'm not sure, streaming inserts into BigQuery are not free. So currently it will be batch processing from FileIO.costello
Note that you can also use load jobs in a streaming pipeline by choosing the FILE_LOADS method and setting .withTriggeringFrequency (docs)Guillem Xercavins

2 Answers

4
votes

I did some tests where I simulate the typical auto-detect pattern: first I run through all the data to build a Map of all possible fields and the type (here I just considered String or Integer for simplicity). I use a stateful pipeline to keep track of the fields that have already been seen and save it as a PCollectionView. This way I can use .withSchemaFromView() as the schema is unknown at pipeline construction. Note that this approach is only valid for batch jobs.

First, I create some dummy data without a strict schema where each row may or may not contain any of the fields:

PCollection<KV<Integer, String>> input = p
  .apply("Create data", Create.of(
        KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
        KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
        KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
        KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
  );

We'll read the input data and build a Map of the different field names that we see in the data and a basic type checking to determine if it contains an INTEGER or a STRING. Of course, this could be extended if desired. Note that all data created before was assigned to the same key so that they are grouped together and we can build a complete list of fields but this can be a performance bottleneck. We materialize the output so that we can use it as a side input:

PCollectionView<Map<String, String>> schemaSideInput = input  
  .apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {

    // A map containing field-type pairs
    @StateId("schema")
    private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
        StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

    @ProcessElement
    public void processElement(ProcessContext c,
                               @StateId("schema") ValueState<Map<String, String>> schemaSpec) {
      JSONObject message = new JSONObject(c.element().getValue());
      Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());

      // iterate through fields
      message.keySet().forEach(key ->
      {
          Object value = message.get(key);

          if (!current.containsKey(key)) {
              String type = "STRING";

              try {
                  Integer.parseInt(value.toString());
                  type = "INTEGER";
              }
              catch(Exception e) {}

              // uncomment if debugging is needed
              // LOG.info("key: "+ key + " value: " + value + " type: " + type);

              c.output(KV.of(key, type));
              current.put(key, type); 
              schemaSpec.write(current);
          }
      });
    }
  })).apply("Save as Map", View.asMap());

Now we can use the previous Map to build the PCollectionView containing the BigQuery table schema:

PCollectionView<Map<String, String>> schemaView = p
  .apply("Start", Create.of("Start"))
  .apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Map<String, String> schemaFields = c.sideInput(schemaSideInput);  
        List<TableFieldSchema> fields = new ArrayList<>();  

        for (Map.Entry<String, String> field : schemaFields.entrySet()) 
        { 
            fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
            // LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
        }

        TableSchema schema = new TableSchema().setFields(fields);

        String jsonSchema;
        try {
            jsonSchema = Transport.getJsonFactory().toString(schema);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));

      }}).withSideInputs(schemaSideInput))
  .apply("Save as Singleton", View.asSingleton());

Change fully-qualified table name PROJECT_ID:DATASET_NAME.dynamic_bq_schema accordingly.

Finally, in our pipeline we read the data, convert it to TableRow and write it to BigQuery using .withSchemaFromView(schemaView):

input
  .apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
          JSONObject message = new JSONObject(c.element().getValue());
          TableRow row = new TableRow();

          message.keySet().forEach(key ->
          {
              Object value = message.get(key);
              row.set(key, value);
          });

        c.output(row);
      }}))
  .apply( BigQueryIO.writeTableRows()
      .to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
      .withSchemaFromView(schemaView)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Full code here.

BigQuery table schema as created by the pipeline:

enter image description here

and resulting sparse data:

enter image description here

0
votes

You can create/update your table schemas in the streaming job if your data is serialized based on a schema (avro, protobuf, etc.). In that sense it is predefined but still updating the table schema as part of the processing.