I'm trying to create and deploy a Dataflow pipeline to stream data collected from a Cloud Pubsub topic to Cloud Datastore. However, I keep running into errors as follows: cannot find symbol symbol: variable DatastoreIO location: class DataUpload
I don't understand what I'm doing wrong as I followed the following documentation to import the relevant package - DatastoreV1 as well as followed the syntax of the sample code within: https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/datastore/DatastoreV1
Any help would greatly alleviate the head-banging I've been doing for these couple of days.. Thanks in advance!
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1;
public class DataUpload{
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
//options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.topic("projects/{project_name}/topics/data"))
.apply(DatastoreIO.v1().write().withProjectId(projectId));
p.run();
}
}
Update: I've included the below line and got a new bunch of errors.
import com.google.cloud.dataflow.sdk.io.datastore.DatastoreIO;
DataUpload.java:[30,14] no suitable method found for apply(com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write) method com.google.cloud.dataflow.sdk.values.PCollection.apply(com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>) is not applicable (cannot infer type-variable(s) OutputT (argument mismatch; com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write cannot be converted to com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)) method com.google.cloud.dataflow.sdk.values.PCollection.apply(java.lang.String,com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>) is not applicable (cannot infer type-variable(s) OutputT (actual and formal argument lists differ in length))