0
votes

To Join two nested structure PCollection, we need to Unnest the PCollection before doing join, as getting challenges (refer my other stackoverflow case a link). So want to know how to unnest the PCollection. It would be good if some one give idea either Join two nested table or how to unnest PCollections.

I just noted that we have PTransform "Unnest" (link) for unnesting collection from the nested one. But I could not find any sample on net. However I just tried to implement it with below steps to convert nested collection, but still unable to get the unnest Collection in last.

1) PCollection empCollection = ReadCollection(); 2) Using Pardo function convert the value from PCollection (com.google.api.services.bigquery.model.TableRow) to PCollection(org.apache.beam.sdk.values.Row) 3) Define the Schema like below Schema projects = Schema.builder().addInt32Field("Id").addStringField("Name").build(); Schema Employees = Schema.builder().addStringField("empNo").addStringField("empName").addArrayField("Projects", FieldType.row(projects)).build(); 4) Use Unnest transform to unnest the nested collection

PCollection<Row> pcColl = targetRowCollection.apply(Unnest.<Row>create().withFieldNameFunction(new SerializableFunction<java.util.List<java.lang.String>, java.lang.String>() {
@Override
public java.lang.String apply(java.util.List<java.lang.String> input) {
    return String.join("+", input);
    }
}));

5) Using Pardo function convert the value from PCollection(org.apache.beam.sdk.values.Row) to PCollection (com.google.api.services.bigquery.model.TableRow)

Could some one to help me, using this Unnest transform to convert the unnest collection from nested collection.

1
Could you clarify what you meant by unnest ? A PCollection is just a collection of elements. Does each element map to a row of your table (before joining) ? - chamikara
If PCollection does not have nested type (In BigQuery - record type) collection then its called unnest. I want to do like, in ParDo fn, the PCollection of TableRow, which has nested structure(Record type) and covert the data to make output pcollection without record type. - lourdu rajan
I see. Note that nested/unnested is not a definition from Beam but something you used when referring to the specific type of elements you are using (TableRow in this case). In general, to join two PCollections (using CoGroupByKey) you have to generate a PCollection of KVs. So you have to extract key out of your nested structure. - chamikara
Also, you can do this unnesting in a ParDo step. - chamikara
Yes, i am playing with specific pcollection type as TableRow. I know that we can unnest the collection inside the ParDo fn but i dont have code snippet to do. It would be good if you share code snippet. - lourdu rajan

1 Answers

0
votes

code for joining two Pcollection with nested structure in python with Beam:

with beam.Pipeline(options=option) as p:

    source_record1 =  p | "get data1" >> beam.io.avroio.ReadFromAvro(input_file1)
    source_record2 =  p | "get data2" >> beam.io.avroio.ReadFromAvro(input_file2)

    #convert into <k,v> form
    keyed_record1 = source_record1 | beam.ParDo(addkeysnested(),join_fileld_names1)
    keyed_record2 = source_record2 | beam.ParDo(addkeysnested(),join_fileld_names2)

    #Apply join operation
    rjoin = ({'File1Info': keyed_record1, 'File2Info': keyed_record2}                     
               | beam.CoGroupByKey())


    class addkeysnested(beam.DoFn):
        def process(self,element,fieldName):
            tmp_record = element    
            fieldName = fieldName.split(".")
            for i in range(len(fieldName)):

                if i != len(fieldName) - 1 :
                    tmp_record = tmp_record[fieldName[i].strip()][0]

                else:
                    tmp_record = tmp_record[fieldName[i].strip()]   

        return [(tmp_record,element)]

Note: In above code we can get keyvalue at any level of nested fields i.e. personalInfo.Address.City, After that apply CoGroupByKey() to join two pcollection