2
votes

I need to use a trust store to make an SSL Kafka connection in Google Cloud Dataflow. Can I supply this from a bucket or is there a way to store this on the "local file system"?

2

2 Answers

4
votes

You can use KafkaIO.Read.withConsumerFactoryFn to supply a factory function that will be invoked to create the Kafka consumer. In that function, you're free to do anything you like, e.g. you can download the trust store file from a GCS bucket (I would recommend using GcsUtil for that) and save it to a temporary file on local disk - AFAIK Kafka itself only supports having this file on local disk. Then manually create a KafkaConsumer and point it at the file.

1
votes

Thanks @jkff for the solution, here is an implementation example:

Sample ConsumerFactoryFn implementation:

    private static class ConsumerFactoryFn
        implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
{



    public Consumer<byte[], byte[]> apply(Map<String, Object> config) 
    {
        try 
        {
            Storage storage = StorageOptions.newBuilder()
                    .setProjectId("prj-id-of-your-bucket")
                    .setCredentials(GoogleCredentials.getApplicationDefault())
                    .build()
                    .getService();
            Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
            ReadChannel readChannel = blob.reader();
            FileOutputStream fileOuputStream;
            fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
            fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
            fileOuputStream.close();
            File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
            if (f.exists())
            {
                LOG.debug("key exists");

            }
            else
            {
                LOG.error("key does not exist");

            }

        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            LOG.error( e.getMessage());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            LOG.error( e.getMessage());
        }


        config.put("ssl.truststore.location",(Object) "/tmp/kafka.client.truststore.jks" );

        return new KafkaConsumer<byte[], byte[]>(config);
    }
}

and do not forget to use .withConsumerFactoryFn in your KafkaIO.read() call, should be something like:

Map<String, Object> configMap = new HashMap<String, Object>();
configMap.put("security.protocol", (Object) "SSL");
configMap.put("ssl.truststore.password", (Object) "clientpass");

p.apply("ReadFromKafka", KafkaIO.<String, String>read()
            .withBootstrapServers("ip:9093")
            .withTopic("pageviews")
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .updateConsumerProperties(configMap)
            .withConsumerFactoryFn(new ConsumerFactoryFn()) ... etc.