I am running a streaming beam job on a flink cluster where I am getting the following exception.
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
at org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:428)
at org.apache.beam.sdk.io.FileSystems.rename(FileSystems.java:308)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:755)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:850)
The streaming job is getting data from the apache pulsar source and writing output data onto an Alluxio data lake in parquet file format. I am using Spotify's scio for writing this job in Scala. A little code chunk to emphasize what I am trying to achieve:
pulsarSource
.open(sc)
.withFixedWindows(Duration.standardSeconds(windowDuration))
.toSinkTap(sink)
From the exception, I can see that source and output paths should have the same URI scheme but I don't know how it is happening because I am using an alluxio path as an output directory. There are some temp directories that are being created on alluxio output directory but after the WindowDuration, when the output file is being created, this exception happens.
I had a doubt that temp location might be configured by default to the local filesystem, so I did set that to output directory path (alluxio dir path) but it didn't change anything.
sc.options.setTempLocation(outputDir)
I want to do remote debugging in order to figure out the issue. I have tried this document to do remote debugging on the task executor node, but once my IntelliJ IDE connects with the node, I don't get hit on my breakpoint.
Can someone suggest, how can I debug or get more information about this issue. Thanks