2
votes

I am trying setup a google cloud dataflow pipeline (streaming mode) that read pubsub topic message, extract information (object name in google cloud storage) from published message, then start another pipeline (batch mode) to process the object stored in google cloud storage.

Is it possible to start another pipeline within a pipeline???

2
Could you edit your question to explain more about your use case? It might be possible to achieve it with a single pipeline, which would be much simpler to manage.jkff
Here is my use case. I have logs generated every hour and stored in cloud storage. I configured object change notification for that bucket and have the notification POST to an application I developed on GAE. Once the GAE application received the object change notification POST, I extracted the newly generated log's name, and use cloud pub/sub to publish to a topic. Then my dataflow program use PUBSUBIO to read from (in streaming mode) the message from the topic and extract the log name and bucket info. Then, I wish to start another pipeline to batch process the specifed log.火星一號
If the only reason you start another pipeline is to apply TextIO.Read.from() on the arrived log filename, then I'd recommend to change this and read the log using a simple manual ParDo. E.g. see stackoverflow.com/questions/32277968/…jkff
thanks for the reply.火星一號

2 Answers

2
votes

There is no technical reason barring this. You would need to be sure to keep your Pipeline objects separate, have sufficient Compute Engine quota to launch all the jobs you need.

1
votes

We got it to work. Doing this:

private static class ExecuteUpdateTaskFroNamespace extends DoFn<String, String> {
    @Override
    public void processElement(ProcessContext c) throws Exception {
        String namespace = c.element();
        LOG.info("Processing namespace: " + namespace);

        BasicOptions options = c.getPipelineOptions().cloneAs(BasicOptions.class);

        EntityOptions entityOptions = PipelineOptionsFactory.as(EntityOptions.class); // important to NOT use .create()
        entityOptions.setNamespace(namespace);
        entityOptions.setProject(options.getProject());
        entityOptions.setRunner(DataflowPipelineRunner.class);
        entityOptions.setStagingLocation(options.getStagingLocation());
        entityOptions.setKind("DocsAsset");
        try {
            Pipeline p = Pipeline.create(entityOptions);
            p.apply("Read from Datastore", BcDatastoreReadFactory.getEntitySource(entityOptions))
                    .apply("Find Old Site Entities", ParDo.of(new FindEntities()))
                    .apply("Transform Entities", ParDo.of(new TransformEntities()))
                    .apply("Save", DatastoreIO.v1().write().withProjectId(entityOptions.getProject()));
            p.run();

            LOG.info("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
            c.output("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);

        } catch (Exception e) {
            LOG.warn("Unable to create pipeline for namespace: " + namespace, e);
        }

    }
}

Issues: You can't spawn more then 25 at a time without hitting quota, to bypass this you can change setRunner(DataflowPipelineRunner.class) to setRunner(BlockingDataflowPipelineRunner.class). BUT BlockingDataflowPipelineRunner is removed in 2.0.0

EntityOptions and BasicOptions are extensions of PipelineOptions.