I was trying to create a template for Apache beam to index data to elasticsearch. The template is getting created but while invoking the template the pipeline failed with No protocol Error. It looks very odd as the error is related to the URL object.
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public interface IndexToEsOptions extends PipelineOptions {
@Description("Path of the gzip index file to read from")
ValueProvider<String> getInputIndexFile();
void setInputIndexFile(ValueProvider<String> value);
@Description("Index name to index with")
ValueProvider<String> getIndexName();
void setIndexName(ValueProvider<String> value);
@Description("Index template name")
ValueProvider<String> getIndexTemplate();
void setIndexTemplate(ValueProvider<String> value);
@Description("URI for es")
@Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
ValueProvider<String> getEsUri();
void setEsUri(ValueProvider<String> value);
}
public static void main(String[] args) {
IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(args).
withValidation().as(IndexToEsOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri().toString()},
options.getIndexName().toString(),
options.getIndexTemplate().toString())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);
p.run();
}
The error which I get when I run is
java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:194) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183) ... 14 more Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) Caused by: java.net.MalformedURLException: no protocol: RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com} java.base/java.net.URL.(URL.java:627) java.base/java.net.URL.(URL.java:523) java.base/java.net.URL.(URL.java:470) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834)
https
then you will have to give the TrustStore file and password – bigbounty@Default.String
to the template variable – bigbounty