1
votes

I'm trying to create and deploy a Dataflow pipeline to stream data collected from a Cloud Pubsub topic to Cloud Datastore. However, I keep running into errors as follows: cannot find symbol symbol: variable DatastoreIO location: class DataUpload

I don't understand what I'm doing wrong as I followed the following documentation to import the relevant package - DatastoreV1 as well as followed the syntax of the sample code within: https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/datastore/DatastoreV1

Any help would greatly alleviate the head-banging I've been doing for these couple of days.. Thanks in advance!

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1;

public class DataUpload{

    public static void main(String[] args) {
        DataflowPipelineOptions options = PipelineOptionsFactory.create()
                .as(DataflowPipelineOptions.class);

        options.setRunner(DataflowPipelineRunner.class);
        options.setProject(projectName);
        //options.setStagingLocation("gs://my-staging-bucket/staging");
        options.setStreaming(true);

        Pipeline p = Pipeline.create(options);
        p.apply(PubsubIO.Read.topic("projects/{project_name}/topics/data"))
             .apply(DatastoreIO.v1().write().withProjectId(projectId));

        p.run();
    }
}

Update: I've included the below line and got a new bunch of errors.

import com.google.cloud.dataflow.sdk.io.datastore.DatastoreIO;

DataUpload.java:[30,14] no suitable method found for apply(com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write) method com.google.cloud.dataflow.sdk.values.PCollection.apply(com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>) is not applicable (cannot infer type-variable(s) OutputT (argument mismatch; com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write cannot be converted to com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)) method com.google.cloud.dataflow.sdk.values.PCollection.apply(java.lang.String,com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>) is not applicable (cannot infer type-variable(s) OutputT (actual and formal argument lists differ in length))

1

1 Answers

2
votes

Please add the following import statement, which I think you are missing.

import com.google.cloud.dataflow.sdk.io.DatastoreIO

You can find some examples and documentation for using this here

If you are using dataflow 2.0+, then please check out these java docs, with some different package and method names.