1
votes

I'm trying to build a dataflow process to help archive data by storing data into Google Cloud Storage. I have a PubSub stream of Event data which contains the client_id and some metadata. This process should archive all incoming events, so this needs to be a streaming pipeline.

I'd like to be able to handle archiving the events by putting each Event I receive inside a bucket that looks like gs://archive/client_id/eventdata.json . Is that possible to do within dataflow/apache beam, specifically being able to assign the file name differently for each Event in the PCollection?

EDIT: So my code currently looks like:

public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {

private String customerId;

public PerWindowFiles(String customerId) {
  this.customerId = customerId;
}

@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
  String filename = bucket+"/"+customerId;
  return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}

@Override
public ResourceId unwindowedFilename(
    ResourceId outputDirectory, Context context, String extension) {
  throw new UnsupportedOperationException("Unsupported.");
}
}


public static void main(String[] args) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
    .withValidation()
    .as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);

PCollection<Event> set = p.apply(PubsubIO.readStrings()
                                     .fromTopic("topic"))
    .apply(new ConvertToEvent()));

PCollection<KV<String, Event>> events = labelEvents(set);
PCollection<KV<String, EventGroup>> sessions = groupEvents(events);

String customers = System.getProperty("CUSTOMERS");
JSONArray custList = new JSONArray(customers);
for (Object cust : custList) {
  if (cust instanceof String) {
    String customerId = (String) cust;
    PCollection<KV<String, EventGroup>> custCol = sessions.apply(new FilterByCustomer(customerId));
            stringifyEvents(custCol)
                .apply(TextIO.write()
                                               .to("gs://archive/")
                                               .withFilenamePolicy(new PerWindowFiles(customerId))
                                               .withWindowedWrites()
                                               .withNumShards(3));
  } else {
    LOG.info("Failed to create TextIO: customerId was not String");
  }
}

p.run()
    .waitUntilFinish();
}

This code is ugly because I need to redeploy every time a new client happens in order to be able to save their data. I would prefer to be able to assign customer data to an appropriate bucket dynamically.

1
Please provide a code example of what you tried and the difficulties you precisely face in your software development.Fabien
This is the (edited) working code that I have, which relies upon a JSON array of all the clients to be passed in as part of the maven command to deploy. This is obviously suboptimal code.Samuel Wu

1 Answers

2
votes

"Dynamic destinations" - choosing the file name based on the elements being written - will be a new feature available in Beam 2.1.0, which has not yet been released.