0
votes

I'm trying to read data from BigQuery (using TableRow) and write the output to Cassandra. How to do that?

Here's what I've tried. This works:

/* Read BQ */
PCollection<CxCpmMapProfile> data =  p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, CxCpmMapProfile>() {
    public CxCpmMapProfile apply(SchemaAndRecord record) {
        GenericRecord r = record.getRecord();
        return new CxCpmMapProfile((String) r.get("channel_no").toString(), (String) r.get("channel_name").toString());
    }
}).fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`").usingStandardSql().withoutValidation());

/* Write to Cassandra */
data.apply(CassandraIO.<CxCpmMapProfile>write()
    .withHosts(Arrays.asList("<IP addr1>", "<IP addr2>"))
    .withPort(9042)
    .withUsername("cassandra_user").withPassword("cassandra_password").withKeyspace("cassandra_keyspace")
    .withEntity(CxCpmMapProfile.class));

But when I changed Read BQ part using TableRow like this:

/* Read from BQ using readTableRow */
PCollection<TableRow> data = p.apply(BigQueryIO.readTableRows()
    .fromQuery("SELECT channel_no, channel_name FROM `dataset_name.table_name`")
    .usingStandardSql().withoutValidation());

In Write to Cassandra I got the following error

The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (CassandraIO.Write<CxCpmMacProfile>)

1

1 Answers

1
votes

The error is due to the input PCollection containing TableRow elements, while the CassandraIO read is expecting CxCpmMacProfile elements. You need to read the elements from BigQuery as CxCpmMacProfile elements. The BigQueryIO documentation has an example of reading rows from a table and parsing them into a custom type, done through the read(SerializableFunction) method.