0
votes

I have a pub/sub with line delimited jsons. Each pub/sub message has an attribute value with the bigquery table name to write to.

How would I get the individual table name value, and pass it to a new pipeline?

Is it ok to create a new PCollection and apply it... from within a DoFn itself?

1

1 Answers

2
votes

You can apply a transformation to retrieve the table name within a DoFn and pass a KV pair of <tableName, record> downstream. Then use the dynamic destination support within BigQueryIO to route each record to the correct destination. Alternatively, you can also do the retrieval of the table attribute within the BigQuery.withFormatFunction(). Below is an example of doing just that.

Here is the overall pipeline structure where JSON messages are consumed from Pub/Sub and then routed to the proper table destination based on a Pub/Sub message attribute. Similarly, you could change the getTableDestination(..) logic to retrieve the table name from within the JSON record instead.

You can view this entire example here.

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(Options options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Retrieve non-serializable parameters
    String tableNameAttr = options.getTableNameAttr();
    String outputTableProject = options.getOutputTableProject();
    String outputTableDataset = options.getOutputTableDataset();

    // Build & execute pipeline
    pipeline
        .apply(
            "ReadMessages",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
        .apply(
            "WriteToBigQuery",
            BigQueryIO.<PubsubMessage>write()
                .to(
                    input ->
                        getTableDestination(
                            input,
                            tableNameAttr,
                            outputTableProject,
                            outputTableDataset))
                .withFormatFunction(
                    (PubsubMessage msg) -> convertJsonToTableRow(new String(msg.getPayload())))
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));

    return pipeline.run();
  }

  /**
   * Retrieves the {@link TableDestination} for the {@link PubsubMessage} by extracting and
   * formatting the value of the {@code tableNameAttr} attribute. If the message is null, a {@link
   * RuntimeException} will be thrown because the message is unable to be routed.
   *
   * @param value The message to extract the table name from.
   * @param tableNameAttr The name of the attribute within the message which contains the table
   *     name.
   * @param outputProject The project which the table resides.
   * @param outputDataset The dataset which the table resides.
   * @return The destination to route the input message to.
   */
  @VisibleForTesting
   static TableDestination getTableDestination(
      ValueInSingleWindow<PubsubMessage> value,
      String tableNameAttr,
      String outputProject,
      String outputDataset) {
    PubsubMessage message = value.getValue();

    TableDestination destination;
    if (message != null) {
      destination =
          new TableDestination(
              String.format(
                  "%s:%s.%s",
                  outputProject, outputDataset, message.getAttributeMap().get(tableNameAttr)),
              null);
    } else {
      throw new RuntimeException(
          "Cannot retrieve the dynamic table destination of an null message!");
    }

    return destination;
  }