The core issue I have right now, is when I run the Dataflow pipeline deployed to Google Cloud Dataflow, I get the error:
java.lang.IllegalStateException: FirebaseApp with name [DEFAULT] doesn't exist.
If I run the same pipeline locally, it all works. So I SUSPECT either an authentication issue, or an environment issue.
Code bits:
The DEPLOY and REAL variables are to control whether or not to push to Cloud (or run locally) and whether or not to use my Pub/Sub source, or use moc'd data. Switching between moc'd and pub/sub data doesn't seem to have an effect on the Firestore situation at all. Only the deploy or not does.
The main() piece where I'm initializing the Firestore application:
public class BreakingDataTransactions {
// When true, this pulls from the specified Pub/Sub topic
static Boolean REAL = true;
// when set to true the job gets deployed to Cloud Dataflow
static Boolean DEPLOY = true;
public static void main(String[] args) {
// validate our env vars
if (GlobalVars.projectId == null ||
GlobalVars.pubsubTopic == null ||
GlobalVars.gcsBucket == null ||
GlobalVars.region == null) {
System.out.println("You have to set environment variables for project (BREAKING_PROJECT), pubsub topic (BREAKING_PUBSUB), region (BREAKING_REGION) and Cloud Storage bucket for staging (BREAKING_DATAFLOW_BUCKET) in order to deploy this pipeline.");
System.exit(1);
}
// Initialize our Firestore instance
try {
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
System.out.println("*************************");
System.out.println(credentials);
FirebaseOptions firebaseOptions =
new FirebaseOptions.Builder()
.setCredentials(credentials)
.setProjectId(GlobalVars.projectId)
.build();
FirebaseApp firebaseApp = FirebaseApp.initializeApp(firebaseOptions);
} catch (IOException e) {
e.printStackTrace();
}
// Start dataflow pipeline
DataflowPipelineOptions options =
PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject(GlobalVars.projectId);
if (DEPLOY) {
options.setRunner(DataflowRunner.class);
options.setTempLocation(GlobalVars.gcsBucket);
options.setRegion(GlobalVars.region);
}
Pipeline p = Pipeline.create(options);
And the piece where I'm processing things:
PCollection<Data> dataCollection =
jsonStrings
.apply(ParDo.of(JSONToPOJO.create(Data.class)))
.setCoder(AvroCoder.of(Data.class));
PCollection<Result> result =
dataCollection
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(WithKeys.of(x -> x.operation + "-" + x.job_id))
.setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(Data.class)))
.apply(Combine.<String, Data, Result>perKey(new DataAnalysis()))
.apply(Reify.windowsInValue())
.apply(MapElements.into(TypeDescriptor.of(Result.class))
.<KV<String, ValueInSingleWindow<Result>>>via(
x -> {
Result r = new Result();
String key = x.getKey();
r.query_action = key.substring(0, key.indexOf("-"));
r.job_id = key.substring(key.indexOf("-") + 1);
r.average_latency = x.getValue().getValue().average_latency;
r.failure_percent = x.getValue().getValue().failure_percent;
r.timestamp = x.getValue().getTimestamp().getMillis();
return r;
}));
// this node will (hopefully) actually write out to Firestore
result.apply(ParDo.of(new FireStoreOutput()));
And finally, the FireStoreOutput class:
public static class FireStoreOutput extends DoFn<Result, String> {
Firestore db;
@ProcessElement
public void processElement(@Element Result result) {
db = FirestoreClient.getFirestore();
DocumentReference docRef = db.collection("events")
.document("next2020")
.collection("transactions")
.document(result.job_id)
.collection("transactions")
.document();
//System.out.println(docRef.getId());
// Add document data with id "alovelace" using a hashmap
Map<String, Object> data = new HashMap<>();
data.put("failure_percent", result.failure_percent);
data.put("average_latency", result.average_latency);
data.put("query_action", result.query_action);
data.put("timestamp", result.timestamp);
// asynchronously write data
ApiFuture<WriteResult> writeResult = docRef.set(data);
try {
writeResult.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
;
}
}
The error occurs on the line: db = FirestoreClient.getFirestore();
I'm deploying the Dataflow job with the --serviceAccount flag specifying a service account that has permissions to do all the things.
So unless the GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); somehow doesn't work (but you see the print statement there, and it does correctly print out the credentials on build) that isn't it.
BUT, that only happens at build time...so I'm wondering if I have a persistence problem, where it initializes fine at build time, but when the job is actually running in the Cloud, it loses the initialization between the deployment and the processing. And if that's the case, how do I solve that problem?
Thanks!