1
votes

I am trying to deploy a Google Cloud Dataflow pipeline which reads from a Kafka cluster, processes its records, and then writes the results to BigQuery. However, I keep encountering the following exception when attempting to deploy:

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster

The Kafka cluster requires the use of a JAAS configuration for authentication, and I use the code below to set the properties required for the KafkaIO.read Apache Beam method:

// Kafka properties
    Map<String, Object> kafkaProperties = new HashMap<String, Object>(){{
        put("request.timeout.ms", 900000);
        put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
        put(CommonClientConfigs.GROUP_ID_CONFIG, GROUP_ID);
    }};

    // Build & execute pipeline
    pipeline
            .apply(
                    "ReadFromKafka",
                    KafkaIO.<Long, String>read()
                            .withBootstrapServers(properties.getProperty("kafka.servers"))
                            .withKeyDeserializer(LongDeserializer.class)
                            .withValueDeserializer(StringDeserializer.class)
                            .withTopic(properties.getProperty("kafka.topic")).withConsumerConfigUpdates(kafkaProperties))

The Dataflow pipeline is to be deployed with public IPs disabled, but there is an established VPN tunnel from our Google Cloud VPC network to the Kafka cluster and the required routing for the private ips on both sides are configured and their IPs are whitelisted. I am able to ping and connect to the socket of the Kafka server using a Compute Engine VM in the same VPN subnetwork as the Dataflow job to be deployed.

I was thinking that there is an issue with the configuration, but I am not able to figure out if I am missing an additional field, or if one of the existing ones is misconfigured. Does anyone know how I can diagnose the problem further since the exception thrown does not really pinpoint the issue? Any help would be greatly appreciated.

Edit: I am now able to successfully deploy the Dataflow job now, however it appears as though the read is not functioning correctly. After viewing the logs to check for the errors in the Dataflow job, I can see that after discovering the group coordinator for the kafka topic, there are no other log statements before a warning log statement saying that the closing of the idle reader timed out:

Close timed out with 1 pending requests to coordinator, terminating client connections

followed by an uncaught exception with the root cause being:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition test-partition could be determined

There is then an error stating:

Execution of work for P0 for key 0000000000000001 failed. Will retry locally.

Could this maybe be an issue with the key definition since the Kafka topics actually do not have keys in the messages? When I view the topic in Kafka Tool, the only columns observed in the data consist of Offset, Message, and a Timestamp.

1
Can you confirm that SSL encryption is disabled on Kafka brokers, since you've propagated SASL_PLAINTEXT parameter on the consumer side?Nick_Kh
@mk_sta yes, I can confirm that SSL encryption is disabled on the Kafka brokers.eagerbeaver
@mk_sta I've also updated the question as I can now deploy the job successfully, but now I am experiencing issues when attempting to read from the Kafka topic and from attempting to close the idle readers.eagerbeaver
Reading from kafka part of your code does not contain any further processing code, can you extend the pipeline records to further analyze the problem? If run your pipeline via DataflowRunner I would suggest for test purpose to switch to DirectRunner to validate whether the issue exists on your local environment.Nick_Kh
So after debugging further, it looks like the issue was due to the fact that the Dataflow job would still attempt to connect to the public IP of the Kafka group coordinator after directly logging into the Kafka clusters using the private bootstrap IPs. The Dataflow job is currently configured to only use private ips in an effort to save costs, so the public IPs of the clusters can not be connected to. I am now attempting to look into a solution to bypass connecting to the group coordinator's public IP.eagerbeaver

1 Answers

0
votes

Based on the last comment, I assume that you're experiencing the issue more with network stack then initially seeking for any configuration lacks in Dataflow pipeline, in terms of performing Dataflow job runners connections to Kafka brokers.

Basically, when you use Public IP addresses pool for Dataflow workers you have a simplest way to reach external Kafka cluster with no extra configuration to apply on both sides, as you don't need to launch VPC network between parties and perform routine network job to get all routes work.

However, Cloud VPN brings some more complications implementing VPC network on both sides and further adjusting VPN gateway, forwarding rules, and addressing pool for this VPC. Instead, from Dataflow runtime perspective you don't need to spread Public IP addresses between Dataflow runners and doubtlessly reduce the price.

The problem that you've mentioned primary lays on Kafka cluster side. Due to the fact that Apache Kafka is a distributed system, it has the core principle: When producer/consumer executes, it will request metadata about which broker is the leader for a partition, receiving metadata with endpoints available for that partition,thus the client then acknowledge those endpoints to connect to the particular broker. And as far as I understand in your case, the connection to leader is performing through the listener bounded to the external network interface, configured in server.properties broker setting.

Therefore, you might consider to create a separate listener (if it doesn't exist) in listeners bounded to cloud VPC network interface and if necessary propagate advertised.listeners with metadata that is going back to client, consisting data for connection to the particular broker.