I'm new to a project where we have a spring-boot application running on GKE receiving (via Kafka) and publishing events via Pub/Sub. Consumers of these events might want to have these events replayed and we want them to request this via the REST API of our application. Since the application stores the events in GCS before publishing, we thought Apache Beam pipelines run with DataFlow should do the trick.
One "replay request" might result in multiple pipelines, since the events in GCS are stored in folder structures containing the date (e.g. gs://<entity>/2020/12/13/event.json) and depending on how much history the consumer needs, we create a pipeline per day of events.
I'm fairly confident that the logic of defining and submitting pipelines is correct, since the application is able to perform this on a local Kubernetes cluster with the DirectRunner.
On DataFlow I run into the issue summarized here. Spawning a worker (org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness) fails due to a classpath issue:
Caused by: java.lang.NoClassDefFoundError: org/apache/beam/sdk/options/PipelineOptions
I can see that my jar that should have the correct dependencies on the classpath when DataFlow spawns the worker (Omitted most parameters):
java
-cp
/opt/google/dataflow/batch/libshuffle_v1.jar:/opt/google/dataflow/batch/dataflow-worker.jar:/opt/google/dataflow/slf4j/jcl_over_slf4j.jar:/opt/google/dataflow/slf4j/log4j_over_slf4j.jar:/opt/google/dataflow/slf4j/log4j_to_slf4j.jar:/var/opt/google/dataflow/app-6BkavP-0nx4wHMC__85sdbCjJQa7QcQcOxGSQL5huMU.jar
...
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness
After playing around with different scopes of the beam dependencies, because I suspected a clash with the google-dataflow.jar, I haven't seen any change. I'm a bit clueless on where to continue looking. I'm using beam library version 2.27.0 and these are the ones referred to in my pom.xml:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
<version>${beam.version}</version>
</dependency>
Any advice is much appreciated.
...in your classpath. If you could provide the entire thing we could see if anything important is missing. - Kenn Knowles