We got it to work. Doing this:
private static class ExecuteUpdateTaskFroNamespace extends DoFn<String, String> {
@Override
public void processElement(ProcessContext c) throws Exception {
String namespace = c.element();
LOG.info("Processing namespace: " + namespace);
BasicOptions options = c.getPipelineOptions().cloneAs(BasicOptions.class);
EntityOptions entityOptions = PipelineOptionsFactory.as(EntityOptions.class); // important to NOT use .create()
entityOptions.setNamespace(namespace);
entityOptions.setProject(options.getProject());
entityOptions.setRunner(DataflowPipelineRunner.class);
entityOptions.setStagingLocation(options.getStagingLocation());
entityOptions.setKind("DocsAsset");
try {
Pipeline p = Pipeline.create(entityOptions);
p.apply("Read from Datastore", BcDatastoreReadFactory.getEntitySource(entityOptions))
.apply("Find Old Site Entities", ParDo.of(new FindEntities()))
.apply("Transform Entities", ParDo.of(new TransformEntities()))
.apply("Save", DatastoreIO.v1().write().withProjectId(entityOptions.getProject()));
p.run();
LOG.info("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
c.output("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
} catch (Exception e) {
LOG.warn("Unable to create pipeline for namespace: " + namespace, e);
}
}
}
Issues: You can't spawn more then 25 at a time without hitting quota, to bypass this you can change setRunner(DataflowPipelineRunner.class) to setRunner(BlockingDataflowPipelineRunner.class). BUT BlockingDataflowPipelineRunner is removed in 2.0.0
EntityOptions and BasicOptions are extensions of PipelineOptions.