1
votes

I'm integrating in a cloud dataflow an "apply" that write a json message to Cloud Firestore. The problem is that apache beam library (and dependencies) and Firestore library aren“t compatible. Below, i show you an extract of my pom, dataflow code and maven compilation error:

Dataflow work exceptionally well reading from pub/sub or from cloud storage and write to pub/sub. But in the moment that i add the firestore dependency i'm getting a dependency error in compilation time. I think is a problem with the grpc. I have intented with the last apache beam version but i get the same error.

pom

    <properties>
        <beam.version>2.8.0</beam.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
            <version>${beam.version}</version>
            <!--scope>runtime</scope-->
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-firestore</artifactId>
            <version>1.0.0</version>
        </dependency>

Dataflow (Java 8) FirestoreFn: Function to insert to firestore

PCollection<String> pcollStorage =
                pColl
                    .apply("Read from GCS", TextIO.read().from(options.getInputFile()));
            PCollection<EventAvailabilityAlert> pcollEvent = pcollStorage
                    .apply("CsvLineToJson", ParDo.of(new CsvLineToJsonMsgFn()))
                    .apply("Firestore Insert", ParDo.of(new FirestoreFn()));
            PCollection<PubsubMessage> pcollPubsubMsg = pcollEvent
                    .apply("JsonMsgToPubsubMsg", ParDo.of(new JsonMsgToPubsubMsgFn()));
            pcollPubsubMsg
                    .apply("Sending To Pub/Sub",PubsubIO.writeMessages().to(pubSubProjectsFolder + projectId + pubSubTopicFolder + pubSubTopicName));
            pColl.run();

Maven error

Could not resolve version conflict among [
org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 -> io.grpc:grpc-core:jar:1.13.1, org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> com.google.api:gax-grpc:jar:1.29.0 
-> io.grpc:grpc-protobuf:jar:1.10.1 
-> io.grpc:grpc-core:jar:1.10.1, org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> com.google.api:gax-grpc:jar:1.29.0 
-> io.grpc:grpc-protobuf:jar:1.10.1 
-> io.grpc:grpc-protobuf-lite:jar:1.10.1 
-> io.grpc:grpc-core:jar:1.10.1, org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> io.grpc:grpc-auth:jar:1.13.1 
-> io.grpc:grpc-core:jar:[1.13.1,1.13.1], org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> io.grpc:grpc-netty:jar:1.13.1 
-> io.grpc:grpc-core:jar:[1.13.1,1.13.1], org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> io.grpc:grpc-stub:jar:1.13.1 
-> io.grpc:grpc-core:jar:1.13.1, org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> com.google.cloud.bigtable:bigtable-client-core:jar:1.4.0 
-> io.grpc:grpc-core:jar:1.10.1, org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> io.grpc:grpc-all:jar:1.13.1 
-> io.grpc:grpc-core:jar:[1.13.1,1.13.1], org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> io.grpc:grpc-all:jar:1.13.1 
-> io.grpc:grpc-okhttp:jar:1.13.1 
-> io.grpc:grpc-core:jar:[1.13.1,1.13.1], org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> io.grpc:grpc-all:jar:1.13.1 
-> io.grpc:grpc-protobuf-nano:jar:1.13.1 
-> io.grpc:grpc-core:jar:1.13.1, org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.8.0 
-> io.grpc:grpc-all:jar:1.13.1 
-> io.grpc:grpc-testing:jar:1.13.1 
-> io.grpc:grpc-core:jar:[1.13.1,1.13.1], com.google.cloud:google-cloud-firestore:jar:1.0.0 
-> io.grpc:grpc-netty-shaded:jar:1.19.0 
-> io.grpc:grpc-core:jar:[1.19.0,1.19.0], com.google.cloud:google-cloud-firestore:jar:1.0.0 
-> io.opencensus:opencensus-contrib-grpc-util:jar:0.19.2 
-> io.grpc:grpc-core:jar:1.18.0]

Full maven compilation error

org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal on project yyy: Could not resolve dependencies for project xx.xxx:yyy:jar:0.1: Failed to collect dependencies for xx.xxx:yyy:jar:0.1
    at org.apache.maven.lifecycle.internal.LifecycleDependencyResolver.getDependencies (LifecycleDependencyResolver.java:269)
    at org.apache.maven.lifecycle.internal.LifecycleDependencyResolver.resolveProjectDependencies (LifecycleDependencyResolver.java:147)
    at org.apache.maven.lifecycle.internal.MojoExecutor.ensureDependenciesAreResolved (MojoExecutor.java:248)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:202)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347)


I expect to insert the messages in real time to firestore and i think that integrating the functionality in dataflow (like a "apply" operation) is the best approach.
Does exist a **workaround** to resolve the issue or definitively is a bug of compatibility.
Could you explain if exist another eficiente design to achieve the goal (for example, calling from dataflow to firestore rest/grpc api, the last one implemented in another project).
1
Hi Oliver, i have to implement a similar pipeline. Can you please post your solution and explain how you fixed this problem. If possible, can you please share reference code for FirestoreFunction. Thanks in sdvanced. - Gaurav

1 Answers

0
votes

One way to achieve this is using cloud functions with firebase and cloud pubsub along with cloud dataflow, so for every data(json) you want to write to firestore you would rather write that onto a cloud pubsub topic and for each such event a cloud function would be triggered which will read from pubsub and write to firestore something like this:

    export const myFunction = functions.pubsub.topic('sushtopictarget').onPublish(message => {
       const message1 = message.data;
       functions.logger.info('Message recieved1.',message.json);
       var userObject = {
          fname : message.json.fname,
          lname : message.json.lname,
       };
       admin.firestore().collection('/sushtest2/').add(userObject);
       const doc = admin.firestore().doc('/sushtest2/{pushId}');
       doc.set(userObject);
       //return 
      admin.firestore.DocumentReference('/sushtest2/{pushId}').add(userObject);
    });