0
votes

I'm using Java to convert JSON to Avro and store these to GCS using Google DataFlow. The Avro schema is created on runtime using SchemaBuilder.

One of the fields I define in the schema is an optional LONG field, it is defined like this:

SchemaBuilder.FieldAssembler<Schema> fields = SchemaBuilder.record(mainName).fields();
Schema concreteType = SchemaBuilder.nullable().longType();
fields.name("key1").type(concreteType).noDefault();

Now when I create a GenericRecord using the schema above, and "key1" is not set, when putting the resulted GenericRecord to the context of my DoFn: context.output(res); I get the following error:

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.avro.UnresolvedUnionException: Not in union ["long","null"]: 256

I also tried doing the same thing with withDefault(0L) and got the same result.

What do I miss? Thanks

2
Could you please update your post with full pipeline code?aga

2 Answers

1
votes

It works fine for me when trying as below and you can try to print the schema that will help to compare also you can remove the nullable() for long type to try.

fields.name("key1").type().nullable().longType().longDefault(0);

Provided the complete code that I used to test:

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.RecordBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

import java.io.File;
import java.io.IOException;

public class GenericRecordExample {

  public static void main(String[] args) {

    FieldAssembler<Schema> fields;
    RecordBuilder<Schema> record = SchemaBuilder.record("Customer");
    fields = record.namespace("com.example").fields();
    fields = fields.name("first_name").type().nullable().stringType().noDefault();
    fields = fields.name("last_name").type().nullable().stringType().noDefault();
    fields = fields.name("account_number").type().nullable().longType().longDefault(0);

    Schema schema = fields.endRecord();
    System.out.println(schema.toString());

    // we build our first customer
    GenericRecordBuilder customerBuilder = new GenericRecordBuilder(schema);
    customerBuilder.set("first_name", "John");
    customerBuilder.set("last_name", "Doe");
    customerBuilder.set("account_number", 999333444111L);
    Record myCustomer = customerBuilder.build();
    System.out.println(myCustomer);

    // writing to a file
    final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
      dataFileWriter.create(myCustomer.getSchema(), new File("customer-generic.avro"));
      dataFileWriter.append(myCustomer);
      System.out.println("Written customer-generic.avro");
    } catch (IOException e) {
      System.out.println("Couldn't write file");
      e.printStackTrace();
    }

    // reading from a file
    final File file = new File("customer-generic.avro");
    final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
    GenericRecord customerRead;
    try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader)){
      customerRead = dataFileReader.next();
      System.out.println("Successfully read avro file");
      System.out.println(customerRead.toString());

      // get the data from the generic record
      System.out.println("First name: " + customerRead.get("first_name"));

      // read a non existent field
      System.out.println("Non existent field: " + customerRead.get("not_here"));
    }
    catch(IOException e) {
      e.printStackTrace();
    }
  }
}
0
votes

If I understand your question correctly, you're trying to accept JSON strings and save them in a Cloud Storage bucket, using Avro as your coder for the data as it moves through Dataflow. There's nothing immediately obvious from your code that looks wrong to me. I have done this, including saving the data to Cloud Storage and to BigQuery.

You might consider using a simpler, and probably less error prone approach: Define a Java class for your data and use Avro annotations on it to enable the coder to work properly. Here's an example:

import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Data {
    public long nonNullableValue;
    @Nullable public long nullableValue;
}

Then, use this type in your DnFn implementations like you likely already are. Beam should be able to move the data between workers properly using Avro, even when the fields marked @Nullable are null.