From a server, I was able to connect and get the data out from a remote kafka server topic which has SSL configured.
From GCP, How can I connect to a remote kafka server using Google Dataflow pipeline passing SSL truststore, keystore certificates locations and the Google service account json?
I am using Eclipse plugin for dataflow runner option.
If I point to certificate on GCS, It throws error when certs are pointed to Google storage bucket.
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.KafkaException:
java.io.FileNotFoundException:
gs:/bucket/folder/truststore-client.jks (No such file or directory)
Followed: Truststore and Google Cloud Dataflow
Updated code pointing SSL truststore, keystore location to local machine's /tmp directory certifcates in case KafkaIO needs to read from file path. It did not throw FileNotFoundError.
Tried running the server Java client code from the GCP account and also using Dataflow - Beam Java pipeline, I get following error.
ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message
java.io.IOException: Broken pipe
org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at
org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
Any suggestions or examples appreciated.