I am trying to deploy a Google Cloud Dataflow pipeline which reads from a Kafka cluster, processes its records, and then writes the results to BigQuery. However, I keep encountering the following exception when attempting to deploy:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster
The Kafka cluster requires the use of a JAAS configuration for authentication, and I use the code below to set the properties required for the KafkaIO.read Apache Beam method:
// Kafka properties
Map<String, Object> kafkaProperties = new HashMap<String, Object>(){{
put("request.timeout.ms", 900000);
put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
put(CommonClientConfigs.GROUP_ID_CONFIG, GROUP_ID);
}};
// Build & execute pipeline
pipeline
.apply(
"ReadFromKafka",
KafkaIO.<Long, String>read()
.withBootstrapServers(properties.getProperty("kafka.servers"))
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTopic(properties.getProperty("kafka.topic")).withConsumerConfigUpdates(kafkaProperties))
The Dataflow pipeline is to be deployed with public IPs disabled, but there is an established VPN tunnel from our Google Cloud VPC network to the Kafka cluster and the required routing for the private ips on both sides are configured and their IPs are whitelisted. I am able to ping and connect to the socket of the Kafka server using a Compute Engine VM in the same VPN subnetwork as the Dataflow job to be deployed.
I was thinking that there is an issue with the configuration, but I am not able to figure out if I am missing an additional field, or if one of the existing ones is misconfigured. Does anyone know how I can diagnose the problem further since the exception thrown does not really pinpoint the issue? Any help would be greatly appreciated.
Edit: I am now able to successfully deploy the Dataflow job now, however it appears as though the read is not functioning correctly. After viewing the logs to check for the errors in the Dataflow job, I can see that after discovering the group coordinator for the kafka topic, there are no other log statements before a warning log statement saying that the closing of the idle reader timed out:
Close timed out with 1 pending requests to coordinator, terminating client connections
followed by an uncaught exception with the root cause being:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition test-partition could be determined
There is then an error stating:
Execution of work for P0 for key 0000000000000001 failed. Will retry locally.
Could this maybe be an issue with the key definition since the Kafka topics actually do not have keys in the messages? When I view the topic in Kafka Tool, the only columns observed in the data consist of Offset, Message, and a Timestamp.
SASL_PLAINTEXT
parameter on the consumer side? – Nick_KhDataflowRunner
I would suggest for test purpose to switch toDirectRunner
to validate whether the issue exists on your local environment. – Nick_Kh