0
votes

I'm trying to execute a query against a BigQuery table, extract one column and populate to a file. The code below throws an exception. I could be wrong but it seems the process is trying write temporary results to temp location as avro format, read the data from it and throws cast exception.

pipeLine.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> {
                  GenericRecord record = elem.getRecord();
                  return (String) record.get("column");
                })
                .fromQuery("SELECT column FROM `project.dataset.table`")
                .usingStandardSql()
                .withCoder(AvroCoder.of(String.class)))
        .apply(TextIO.write().to("gs://bucket/test/result/data")
                .withSuffix(TXT_EXT)
                .withCompression(Compression.GZIP));

Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String at xxxxx.xxx.xxx.sampling.dataflow.samplingextractor.service.BigQueryExportService.lambda$export$43268ee4$1(BigQueryExportService.java:137) at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:242) at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:235) at org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:597) at org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:209) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479) at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:601)

2
Have you tested the @Haris Nadeem approach? It seems right - rmesteves
It doesn't work. I tried it before posting this question. - Jay Yoo
Did you get the same error? - rmesteves
Yes. it is throwing the same error. - Jay Yoo
Are you running it on DataFlow or DirectRunner? - rmesteves

2 Answers

0
votes

I think it suggests you use .withCoder(AvroCoder.of(org.apache.avro.util.Utf8.class))) as String cannot be converted from Avro Utf8 class directly.

0
votes

From looking at the documentation here, it seems you want to simply use the StringUtf8Coder class.

pipeLine.apply(
    BigQueryIO.read(
            (SchemaAndRecord elem) -> {
              GenericRecord record = elem.getRecord();
              return (String) record.get("column");
            })
            .fromQuery("SELECT column FROM `project.dataset.table`")
            .usingStandardSql()
            .withCoder(StringUtf8Coder.of()))
        .apply(TextIO.write().to("gs://bucket/test/result/data")
            .withSuffix(TXT_EXT)
            .withCompression(Compression.GZIP));