I am trying to read from multiple kafka brokers using KafkaIO on apache beam. The default option for offset management is to the kafka partition itself (no longer using zookeper from kafka >0.9). With this setup, when i restart the job/pipeline, there is issue with duplicate and missing records.
From what i read, the best way to handle this is to manage offset to external data stores. Is it possible to do this with current version of apache beam and KafkaIO? I am using 2.2.0 version right now.
And, after reading from kafka,i will write it to BigQuery. Is there a setup in KafkaIO where I can set the committed message only after i insert the message to BigQuery? I can only find auto commit setup right now.