I would like to understand the exact use case and what you are trying to accomplish with this.
You could use the below for reading from Spanner and batching at pub sub publishing level. It batches pubsub messages while publishing. (Here 1 row gets published as one pubsub message)
Pipeline Process
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(pipelineArgs).as(CustomPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
SpannerConfig spannerConfig = SpannerConfig.create()
.withDatabaseId(options.getSpannerDatabaseId())
.withProjectId(options.getSpannerProjectId())
.withInstanceId(options.getSpannerInstanceId());
pipeline.apply(SpannerIO.read()
.withTable("TestTable")
.withSpannerConfig(spannerConfig)
.withColumns(Arrays.asList("TestColumn")))
.apply( ParDo.of(new StructToPubSubConverter()))
.apply(PubsubIO.writeMessages()
.to(options.getPubsubWriteTopic())
.withMaxBatchSize(1000)); // Batch Size
pipeline.run();
Spanner to PubSub Converter
public static class StructToPubSubConverter extends DoFn<Struct, PubsubMessage> {
@ProcessElement
public void processElement(ProcessContext context, OutputReceiver<PubsubMessage> out){
Struct struct =context.element();
String testColumn = struct.getString(0);
context.output(new PubsubMessage(testColumn.getBytes(),new HashMap<>()));
}
}
Not sure, if this addresses your problem but should provide a fair idea. Sharing more details will be helpful.