1
votes

Google Dataflow job uses Apache Beam's KafkaIO library with AvroIO and Windowed Writes writing output to ".avro" files in Google Cloud Storage bucket. However, it defaults to Streaming as the processing job type on production data.

Is it possible to consume data from Kafka topic using KafkaIO in Dataflow using Batch processing. This dataflow job does not require near real time processing (streaming). Is there a way to also insert the incoming records into BigQuery table without streaming inserts costs enabling the batch type processing.

Batch processing with less frequent runs could work resulting in less memory, vCPUs and compute costs.

As per:https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/KafkaIO.html

KafkaIO source returns unbounded collection of Kafka records as PCollection>.

Does it mean Kafka being unbounded source cannot be run in batch mode?

Testing .withMaxNumRecords(1000000) condition runs the job in batch mode. However, to run the job in live incoming data, I need to remove this condition.

I have tried using windowing and setting streaming mode options flag to false without success as in below code.


// not streaming mode
options.setStreaming(false);

...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
               .withBootstrapServers("IPADDRESS:9092")
               .withTopic(topic)                                
               .withKeyDeserializer(StringDeserializer.class)
               .withValueDeserializer(StringDeserializer.class)                
               .updateConsumerProperties(props)
               .withConsumerFactoryFn(new ConsumerFactory())                            
//             .withMaxNumRecords(1000000)
               .withoutMetadata() 
       ).apply(Values.<String>create())
       .apply(Window.into(FixedWindows.of(Duration.standardDays(1))));


...
//convert to Avro GenericRecord

.apply("AvroToGCS", AvroIO.writeGenericRecords(AVRO_SCHEMA)
.withWindowedWrites()
.withNumShards(1)
.to("gs://BUCKET/FOLDER/")
.withSuffix(".avro"));

The code resulted in Streaming job type with 4 vCPUs and 1 worker for 9 minutes processing 1.8mn records. After this, I had to stop job (drain) to prevent costs.

Enforcing the Batch processing in Dataflow on incoming data, is it possible to collect batch of records writing it as avro files and continue doing so until offset has caught up to latest.

Any examples or sample code greatly appreciated.

1

1 Answers

1
votes

Unbounded sources cannot be run in batch mode. This is by design, as batch pipelines expect a finite amount of data to be read and to terminate when done processing it.

However, you could convert the unbounded sources into a bounded source by constraining how many records it reads, which you have done. Note: There is no guarantee which records will be read.

Streaming pipelines are meant to be always up, so that they are available to read live data. Batch pipelines are meant to read backlogs of stored data.

A batch pipeline will not respond well to reading live data, it will read whatever data is there when you launch the pipeline then terminate.