0
votes

I'm new to a project where we have a spring-boot application running on GKE receiving (via Kafka) and publishing events via Pub/Sub. Consumers of these events might want to have these events replayed and we want them to request this via the REST API of our application. Since the application stores the events in GCS before publishing, we thought Apache Beam pipelines run with DataFlow should do the trick.

One "replay request" might result in multiple pipelines, since the events in GCS are stored in folder structures containing the date (e.g. gs://<entity>/2020/12/13/event.json) and depending on how much history the consumer needs, we create a pipeline per day of events.

I'm fairly confident that the logic of defining and submitting pipelines is correct, since the application is able to perform this on a local Kubernetes cluster with the DirectRunner.

On DataFlow I run into the issue summarized here. Spawning a worker (org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness) fails due to a classpath issue: Caused by: java.lang.NoClassDefFoundError: org/apache/beam/sdk/options/PipelineOptions

I can see that my jar that should have the correct dependencies on the classpath when DataFlow spawns the worker (Omitted most parameters):

java
-cp
/opt/google/dataflow/batch/libshuffle_v1.jar:/opt/google/dataflow/batch/dataflow-worker.jar:/opt/google/dataflow/slf4j/jcl_over_slf4j.jar:/opt/google/dataflow/slf4j/log4j_over_slf4j.jar:/opt/google/dataflow/slf4j/log4j_to_slf4j.jar:/var/opt/google/dataflow/app-6BkavP-0nx4wHMC__85sdbCjJQa7QcQcOxGSQL5huMU.jar
...
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness

After playing around with different scopes of the beam dependencies, because I suspected a clash with the google-dataflow.jar, I haven't seen any change. I'm a bit clueless on where to continue looking. I'm using beam library version 2.27.0 and these are the ones referred to in my pom.xml:

<dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
            <version>${beam.version}</version>
        </dependency>

Any advice is much appreciated.

1
I notice you have ... in your classpath. If you could provide the entire thing we could see if anything important is missing. - Kenn Knowles

1 Answers

0
votes

The class org/apache/beam/sdk/options/PipelineOptions is found in the core Java SDK. The artifact is beam-sdks-java-core. This is not baked in to the Dataflow worker, but is part of the expected staged files.

The DataflowRunner by default will attempt to stage every file that it finds on the classpath. If there is anything about your environment or application that affects its ability to do this, you will need to add the SDK dependency yourself.