1
votes

In my Apache Beam pipeline I have a PCollection of Row objects (org.apache.beam.sdk.values.Row). I want write to Avro files. Here is a simplified version of my code:

   Pipeline p = Pipeline.create();

    Schema inputSchema = Schema.of(
            Schema.Field.of("MyField1", Schema.FieldType.INT32)
    );

    Row row = Row.withSchema(inputSchema).addValues(1).build();
    PCollection<Row> myRow = p.apply(Create.of(row)).setCoder(RowCoder.of(inputSchema));

    myRow.apply(
            "WriteToAvro",
            AvroIO.write(Row.class)
                    .to("src/tmp/my_files")
                    .withWindowedWrites()
                    .withNumShards(10));
    p.run();

The file gets created, but it looks like this (in JSON form):

"schema" : {
    "fieldIndices" : {
        "MyField1" : 0
    },
    "encodingPositions" : {
        "MyField1" : 0
    },
    "fields" : [
        {
        }
    ],
    "hashCode" : 545134948,
    "uuid" : {
    },
    "options" : {
        "options" : {
        }
    }
}

So only the schema is there with bunch of useless metadata. What's the right way of writing to Avro from Row objects so that I have the data and not just the schema. And can I get rid of the metadata?

1
What are you using to inspect the file contents? - OneCricketeer
@OneCricketeer i'm using an online viewer; dataformat.net/avro/viewer-and-converter - artofdoe
I suggest using avro-tools jar file since it's unclear what that site actually is doing - OneCricketeer
just tried it. Get the same exact JSON :/ - artofdoe

1 Answers

0
votes

AvroUtils has some utilities to convert Rows and Beam schemas to Avro-types. You can do something like this:

 Pipeline p = Pipeline.create();

    Schema inputSchema = Schema.of(
            Schema.Field.of("MyField1", Schema.FieldType.INT32)
    );
    avro.Schema avroSchema = AvroUtils.toAvroSchema(inputSchema)

    class ConvertToGenericRecords extends DoFn<Row, GenericRecord> {
      @ProcessElement
      public void process(ProcessContext<Row> c) {
        c.output(AvroUtils.toGenericRecord(c.element(), avroSchema));
      }
    }
    

    Row row = Row.withSchema(inputSchema).addValues(1).build();
    PCollection<Row> myRow = p.apply(Create.of(row)).setCoder(RowCoder.of(inputSchema));

    myRow.apply(ParDo.of(new ConvertToGenericRecords()))
         .withCoder(AvroCoder.of(avroSchema)
         .apply(
            "WriteToAvro",
            AvroIO.writeGenericRecords(avroSchema)
                    .to("src/tmp/my_files")
                    .withWindowedWrites()
                    .withNumShards(10));
    p.run();