1
votes

I have a use-case, where we need to create PCollection which contains fields which are of abstract data type. How to define schema and coder in such cases.

This data is picked up from json files present in some data-source (local/S3, etc)

for ex:

PCollection<Customer>, where Customer is defined as

    Customer {
      Gender gender;
    }
    
    interface Gender {
    }
    
    Female implmements Gender {
    
    }

in pipeline , for customer setting the schema as :

pipeline.getSchemaRegistery().getSchema(Customer.class)

For Testing, have created the PCollection using

Pipeline.apply(Create.of(getCustomers())), where getCustomers gives List

Pipeline fails with following exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Gender.<init>()
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:371)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:339)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:219)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at section5.ComplexCombine1.main (ComplexCombine1.java:147)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Gender.<init>()
    at org.apache.avro.specific.SpecificData.newInstance (SpecificData.java:353)
    at org.apache.avro.specific.SpecificData.newRecord (SpecificData.java:369)
    at org.apache.avro.reflect.ReflectData.newRecord (ReflectData.java:901)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:212)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145)
    at org.apache.beam.sdk.coders.AvroCoder.decode (AvroCoder.java:330)
    at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:118)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:101)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:95)
    at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:118)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:272)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:329)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:324)
    at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs.outputAccumulators (MultiStepCombine.java:322)
Caused by: java.lang.NoSuchMethodException: entities.Gender.<init>()
    at java.lang.Class.getConstructor0 (Class.java:3110)
    at java.lang.Class.getDeclaredConstructor (Class.java:2206)
    at org.apache.avro.specific.SpecificData.newInstance (SpecificData.java:347)
    at org.apache.avro.specific.SpecificData.newRecord (SpecificData.java:369)
    at org.apache.avro.reflect.ReflectData.newRecord (ReflectData.java:901)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:212)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.reflect.ReflectDatumReader.readField (ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145)
    at org.apache.beam.sdk.coders.AvroCoder.decode (AvroCoder.java:330)
    at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
    at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream (CoderUtils.java:118)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:101)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray (CoderUtils.java:95)
    at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init> (MutationDetectors.java:118)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:272)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:329)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output (SimpleDoFnRunner.java:324)
    at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs.outputAccumulators (MultiStepCombine.java:322)
    at org.apache.beam.runners.direct.MultiStepCombine$CombineInputs$DoFnInvoker.invokeFinishBundle (Unknown Source)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.finishBundle (SimpleDoFnRunner.java:242)
    at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle (SimplePushbackSideInputDoFnRunner.java:125)
    at org.apache.beam.runners.direct.ParDoEvaluator.finishBundle (ParDoEvaluator.java:269)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.finishBundle (DoFnLifecycleManagerRemovingTransformEvaluator.java:73)
    at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:193)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:131)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  4.025 s
[INFO] Finished at: 2021-05-12T00:30:14+05:30
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project ApacheBeam1: An exception occured while executing the Java class. java.lang.RuntimeException: java.lang.NoSuchMethodException: entities.Y.<init>() -> [Help 1]
1
Can you provide an example pipeline for reproducing this ? - chamikara
@chamikara Added the details in the following gist gist.github.com/kh-satyam/e643b25d6b8ac95271b0fb2f79415eea - Satyam Khare
Code is failing only while executing Combine.CombineFn<Customer, Acum, Acum> - Satyam Khare
Do we need to specify Schema/Coder for interfaces being used in the Bean. - Satyam Khare

1 Answers

1
votes

I think the failure might be due to pipeline not being able to property determine the Coder for the Customer type. When using the Create transform for a custom type, you may have to specify a Coder using the withCoder method.

See here for more details. See here for an example.