0
votes

I am currently using Google Cloud Dataflow and Apache Beam to consume messages from a Kafka topic that exists in two different Kafka clusters, with both clusters containing the same topic names but different data in the topics. The Kafka clusters are separated because they contain data from separate regions.

I am just wondering if it is possible to consume data from both of the clusters by listing all of the bootstrap servers for both clusters in a single KafkaIO.read Dataflow pipeline step?

.withBootstrapServers("CLUSTER1_SERVER:PORT,CLUSTER2_SERVER:PORT");

I was reading documentation regarding Kafka bootstrap servers and it wasn't clear to me if after connecting to a bootstrap server, messages would only be consumed from the first successful bootstrap server connection cluster, or if it would try all bootstrap servers provided and consume from all clusters found. If the former is the case, then I will need to create a second Dataflow pipeline to process the messages from the second cluster, but it would be much easier if I could process messages from both clusters in a single pipeline.

Any information would be greatly appreciated.

2
Could you please share the documentation you have followed and Dataflow version? Thanks!aga
@muscat I followed the documentation on this page: kafka.apache.org/documentation and the Dataflow/Apache Beam version I am currently using is 2.18eagerbeaver

2 Answers

0
votes

Beam KafkaIO just passes this flag to Kafka's ConsumerConfig's BOOTSTRAP_SERVERS_CONFIG flag. I think this parameter is for passing in multiple brokers from the same Kafka cluster for failover. Not for passing in servers from different Kafka clusters. See here for details regarding the Kafka architecture. I suspect when you specify servers from multiple clusters it just picks the first live one.

0
votes

I don't think it's a good idea to read from different clusters by the same KafkaIO instance since, under the hood, it uses KafkaConsumer to read messages and it will read only from one cluster by design, it's not intended to be used for failover case. Also, actually there are two Kafka consumers used in KafkaIO (one for messages, another one for offsets), so it could be even worse and result will be unpredictable.

In the mean time, you can have two KafkaIO sources for different clusters and then join messages by key or any other property downstream.