0
votes

I was trying to create a template for Apache beam to index data to elasticsearch. The template is getting created but while invoking the template the pipeline failed with No protocol Error. It looks very odd as the error is related to the URL object.

public class StarterPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

    public interface IndexToEsOptions extends PipelineOptions {
        @Description("Path of the gzip index file to read from")
        ValueProvider<String> getInputIndexFile();
        void setInputIndexFile(ValueProvider<String> value);

        @Description("Index name to index with")
        ValueProvider<String> getIndexName();
        void setIndexName(ValueProvider<String> value);

        @Description("Index template name")
        ValueProvider<String> getIndexTemplate();
        void setIndexTemplate(ValueProvider<String> value);

        @Description("URI for es")
        @Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
        ValueProvider<String> getEsUri();
        void setEsUri(ValueProvider<String> value);
    }

    public static void main(String[] args) {



        IndexToEsOptions options = PipelineOptionsFactory
                .fromArgs(args).
                        withValidation().as(IndexToEsOptions.class);

        Pipeline p = Pipeline.create(options);
        p.apply(TextIO.read().from(options.getInputIndexFile()))
                .apply(ElasticsearchIO.write().withConnectionConfiguration(
                        ElasticsearchIO.ConnectionConfiguration.create(
                                new String[]{options.getEsUri().toString()},
                                options.getIndexName().toString(),
                                options.getIndexTemplate().toString())
                                .withConnectTimeout(240)
                        )
                        .withMaxBatchSizeBytes(15 * 1024 * 1024)
                );

        p.run();
    }

The error which I get when I run is

java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:194) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183) ... 14 more Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) Caused by: java.net.MalformedURLException: no protocol: RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com} java.base/java.net.URL.(URL.java:627) java.base/java.net.URL.(URL.java:523) java.base/java.net.URL.(URL.java:470) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834)

1
Seeing the ES url, you have https then you will have to give the TrustStore file and passwordbigbounty
Thanks for your reply. This piece of code works with same URL when dataflow jobs are launched directly using java sdk. It's the templating that causing this problem. And we have VPN tunnels and that is just an internal Lb URL.joss
Try giving @Default.String to the template variablebigbounty

1 Answers

1
votes

To put it simply, no it doesn't look like ElasticsearchIO.ConnectionConfiguration supports ValueProviders, at least not as of the current release (2.22.0). You can see this by looking at the signature for ConnectionConfiguration.Create:

public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
                                                             java.lang.String index,
                                                             java.lang.String type)

And comparing it to a function that does support ValueProviders, ElasticsearchIO.Read.withQuery:

public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)

To support ValueProviders, a function must actually accept a ValueProvider object. This is because the ValueProvider is intended to communicate the parameter during runtime, not during pipeline construction time. So during pipeline construction, it must be passed as a ValueProvider object everywhere.

In your example, what's happening is that you are calling toString on your ValueProvider for EsUri, and instead of resulting in a string containing your URL, you're getting a string representation of your ValueProvider that looks like this: "RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}. That's why you're getting a MalformedURLException. It's attempting to read that string as a URL which fails.

The solution is simple though, you just need to change the EsUri parameter to a construction time parameter by changing it from ValueProvider<String> to String. Just be aware that using it as a construction time parameter means you will need to rebuild the pipeline each time you want to change that parameter. Unfortunately there's nothing you can do about that until ValueProvider support is added.