My Dataflow job seems to always create a new Default Google Cloud Storage bucket even with stagingLocation, tempLocation, and gcpTempLocation defined. After looking through the Apache Beam (2.6.0) library code, these lines in GcpOptions.java appear to be the culprit:
String tempLocation = options.getTempLocation();
if (isNullOrEmpty(tempLocation)) {
tempLocation =
tryCreateDefaultBucket(
options,
newCloudResourceManagerClient(options.as(CloudResourceManagerOptions.class))
.build());
options.setTempLocation(tempLocation);
}
However, this code should only be executed in the event that tempLocation is not defined in PipelineOptions, and I have set tempLocation both using the command line argument as well as using the setTempLocation method. I have defined the Options interface by extending GcpOptions and adding a few additional option getters and setters.
If I don't define the tempLocation getter and setter in the Options interface it appears as though the tempLocation will automatically default to the newly created default bucket's temp folder (gs://dataflow-staging-us-central1-job_number/temp), yet I have defined the tempLocation command line argument.
Here is the main method of the Dataflow job:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class);
GoogleCredentials credentials = null;
try {
credentials = GoogleCredentials.fromStream(new FileInputStream("./src/main/resources/credentials.json"))
.createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
} catch (IOException e) {
e.printStackTrace();
}
options.setGcpCredential(credentials);
options.setTempLocation("gs://example_bucket/temp");
options.setGcpTempLocation("gs://example_bucket/temp");
run(options);
}
Can anyone explain why the default bucket is always created and how I can avoid this?
Edit:
It looks as though if I deploy the Dataflow job directly from the command line with the same arguments instead of generating the Dataflow template and then deploying the job through the console interface the tempLocation seems to be set correctly and no additional bucket is created. This seems to resolve the issue I am having but I am not fully sure why this solution works.
Pipeline p = Pipeline.create(options);
? – Daniel Oliveira