1
votes

Using Google Dataflow i need to read data from Google spanner and write into PubSub as Batch Process, I have more than 100000 records in Spanner , so i need to read those records and publish into PubSub Topic using pubsub batch, 1000 records will be the limit for each iteration of publish.

Please help me on this

2
What's your error? What is your current breaking code? Can you share more detail?guillaume blaquiere
Yes we can write using pubsub io, but I don't want to publish all the records at once, since I may have huge data, so I need to split the data and send as multiple publish requests, I didn't find docs for this, is there any way to do this.wild
I understood that you don't want to publish 100k+ row in one PubSub message. Am I right? If so, do you want to publish row by row into PubSub? or chunk by chunk (about 1000 rows per chunk)?guillaume blaquiere
yes you are right, i want to chunk by 1000 rowswild
Do you have a row number in output of your query? Can you add one?guillaume blaquiere

2 Answers

1
votes

One way to do this is using Using the Dataflow connector

Reading data from Cloud Spanner

To read from Cloud Spanner, apply the SpannerIO.read() transform. Configure the read using the methods in the SpannerIO.Read class. Applying the transform returns a PCollection, where each element in the collection represents an individual row returned by the read operation. You can read from Cloud Spanner with and without a specific SQL query, depending on your desired output.

Applying the SpannerIO.read() transform returns a consistent view of data by performing a strong read. Unless you specify otherwise, the result of the read is snapshotted at the time that you started the read. See reads for more information about the different types of reads Cloud Spanner can perform.

see: https://cloud.google.com/spanner/docs/dataflow-connector

This thread seems to explain how to write from DataFlow to PubSub: Publish messages to Pubsub topic in Dataflow

0
votes

I would like to understand the exact use case and what you are trying to accomplish with this.

You could use the below for reading from Spanner and batching at pub sub publishing level. It batches pubsub messages while publishing. (Here 1 row gets published as one pubsub message)

Pipeline Process

  CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(pipelineArgs).as(CustomPipelineOptions.class);

Pipeline pipeline = Pipeline.create(options);

SpannerConfig spannerConfig = SpannerConfig.create()
    .withDatabaseId(options.getSpannerDatabaseId())
    .withProjectId(options.getSpannerProjectId())
    .withInstanceId(options.getSpannerInstanceId());

pipeline.apply(SpannerIO.read()
    .withTable("TestTable")
    .withSpannerConfig(spannerConfig)
    .withColumns(Arrays.asList("TestColumn")))
    .apply( ParDo.of(new StructToPubSubConverter()))
    .apply(PubsubIO.writeMessages()
        .to(options.getPubsubWriteTopic())
        .withMaxBatchSize(1000)); // Batch Size


pipeline.run();

Spanner to PubSub Converter

public static class StructToPubSubConverter extends DoFn<Struct, PubsubMessage> {

@ProcessElement
public void processElement(ProcessContext context, OutputReceiver<PubsubMessage> out){
  Struct struct =context.element();
  String testColumn = struct.getString(0);
  context.output(new PubsubMessage(testColumn.getBytes(),new HashMap<>()));
}

}

Not sure, if this addresses your problem but should provide a fair idea. Sharing more details will be helpful.