0
votes

The core issue I have right now, is when I run the Dataflow pipeline deployed to Google Cloud Dataflow, I get the error:

java.lang.IllegalStateException: FirebaseApp with name [DEFAULT] doesn't exist.

If I run the same pipeline locally, it all works. So I SUSPECT either an authentication issue, or an environment issue.

Code bits:

The DEPLOY and REAL variables are to control whether or not to push to Cloud (or run locally) and whether or not to use my Pub/Sub source, or use moc'd data. Switching between moc'd and pub/sub data doesn't seem to have an effect on the Firestore situation at all. Only the deploy or not does.

The main() piece where I'm initializing the Firestore application:

    public class BreakingDataTransactions {

    // When true, this pulls from the specified Pub/Sub topic
  static Boolean REAL = true;
    // when set to true the job gets deployed to Cloud Dataflow
  static Boolean DEPLOY = true;

  public static void main(String[] args) {
      // validate our env vars
    if (GlobalVars.projectId   == null ||
        GlobalVars.pubsubTopic == null ||
        GlobalVars.gcsBucket   == null ||
        GlobalVars.region      == null) {
          System.out.println("You have to set environment variables for project (BREAKING_PROJECT), pubsub topic (BREAKING_PUBSUB), region (BREAKING_REGION) and Cloud Storage bucket for staging (BREAKING_DATAFLOW_BUCKET) in order to deploy this pipeline.");
          System.exit(1);
        }

      // Initialize our Firestore instance
    try {
    GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
    System.out.println("*************************");
    System.out.println(credentials);
    FirebaseOptions firebaseOptions =
        new FirebaseOptions.Builder()
            .setCredentials(credentials)
            .setProjectId(GlobalVars.projectId)
            .build();
    FirebaseApp firebaseApp = FirebaseApp.initializeApp(firebaseOptions);

    } catch (IOException e) {
      e.printStackTrace();
    }

      // Start dataflow pipeline
    DataflowPipelineOptions options =
        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);

    options.setProject(GlobalVars.projectId);

    if (DEPLOY) {
        options.setRunner(DataflowRunner.class);
        options.setTempLocation(GlobalVars.gcsBucket);
        options.setRegion(GlobalVars.region);
    }

    Pipeline p = Pipeline.create(options);

And the piece where I'm processing things:

    PCollection<Data> dataCollection =
        jsonStrings
            .apply(ParDo.of(JSONToPOJO.create(Data.class)))
            .setCoder(AvroCoder.of(Data.class));

    PCollection<Result> result =
        dataCollection
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
            .apply(WithKeys.of(x -> x.operation + "-" + x.job_id))
            .setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(Data.class)))
            .apply(Combine.<String, Data, Result>perKey(new DataAnalysis()))
            .apply(Reify.windowsInValue())
            .apply(MapElements.into(TypeDescriptor.of(Result.class))
                    .<KV<String, ValueInSingleWindow<Result>>>via(
                        x -> {
                          Result r = new Result();
                          String key = x.getKey();
                          r.query_action = key.substring(0, key.indexOf("-"));
                          r.job_id = key.substring(key.indexOf("-") + 1);
                          r.average_latency = x.getValue().getValue().average_latency;
                          r.failure_percent = x.getValue().getValue().failure_percent;
                          r.timestamp = x.getValue().getTimestamp().getMillis();
                          return r;
                        }));

          // this node will (hopefully) actually write out to Firestore
        result.apply(ParDo.of(new FireStoreOutput()));

And finally, the FireStoreOutput class:

  public static class FireStoreOutput extends DoFn<Result, String> {

    Firestore db;

    @ProcessElement
    public void processElement(@Element Result result) {

      db = FirestoreClient.getFirestore();
      DocumentReference docRef = db.collection("events")
                                   .document("next2020")
                                   .collection("transactions")
                                   .document(result.job_id)
                                   .collection("transactions")
                                   .document();
      //System.out.println(docRef.getId());
      // Add document data  with id "alovelace" using a hashmap
      Map<String, Object> data = new HashMap<>();
      data.put("failure_percent", result.failure_percent);
      data.put("average_latency", result.average_latency);
      data.put("query_action", result.query_action);
      data.put("timestamp", result.timestamp);

      // asynchronously write data
      ApiFuture<WriteResult> writeResult = docRef.set(data);
      try {
        writeResult.get();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
      ;
    }
  }

The error occurs on the line: db = FirestoreClient.getFirestore();

I'm deploying the Dataflow job with the --serviceAccount flag specifying a service account that has permissions to do all the things.

So unless the GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); somehow doesn't work (but you see the print statement there, and it does correctly print out the credentials on build) that isn't it.

BUT, that only happens at build time...so I'm wondering if I have a persistence problem, where it initializes fine at build time, but when the job is actually running in the Cloud, it loses the initialization between the deployment and the processing. And if that's the case, how do I solve that problem?

Thanks!

1

1 Answers

0
votes

Okay, I found a solution... The biggest issue was that my DAG's PCollection was split into two thread paths. I have two types of operations "read" and "write" so those results were each sending a PCollection to my FirestoreOut class, which is where I was attempting to initialize the Firestore app, resulting in the already initialized problem.

HOWEVER, making my db object a synchronized static object, and instituting a synchronized getDB() method where I initialize only if it's not set yet worked. Final updated relevant code for the FireStoreOut piece:

  public static class FireStoreOutput extends DoFn<Result, String> {

    static Firestore db;

    public static synchronized Firestore getDB() {
      if (db == null) {
        System.out.println("I'm being called");
          // Initialize our Firestore instance
        try {
          GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
          System.out.println("*************************");
          System.out.println(credentials);
          FirebaseOptions firebaseOptions =
              new FirebaseOptions.Builder()
                  .setCredentials(credentials)
                  .setProjectId(GlobalVars.projectId)
                  .build();
          FirebaseApp firebaseApp = FirebaseApp.initializeApp(firebaseOptions);

        } catch (IOException e) {
          e.printStackTrace();
        }
        db = FirestoreClient.getFirestore();
      }
      return db;
    }

    @ProcessElement
    public void processElement(@Element Result result) {
      DocumentReference docRef = getDB().collection("events")
                                   .document("next2020")
                                   .collection("transactions")
                                   .document(result.job_id)
                                   .collection("transactions")
                                   .document();