1
votes

I currently have a job which outputs the contents of a pubsub topic to a cloud storage folder which works fine if I launch the jar directly.

However, whenever I try to launch the job using the template I uploaded, no messages go through the pipeline.

It's very similar to the Google provided template except it accepts a subscription instead of a topic.

Here's my configuration:

trait Options extends PipelineOptions with StreamingOptions {
  @Description("The Cloud Pub/Sub subscription to read from")
  @Default.String("projects/project/subscriptions/subscription")
  def getInputSubscription: String
  def setInputSubscription(value: String): Unit

  @Description("The Cloud Storage directory to output files to, ends with /")
  @Default.String("gs://tmp/")
  def getOutputDirectory: String
  def setOutputDirectory(value: String): Unit

  @Description("The Cloud Storage prefix to output files to")
  @Default.String("subscription-")
  def getOutputFilenamePrefix: String
  def setOutputFilenamePrefix(value: String): Unit

  @Description("The shard template which will be part of the filenames")
  @Default.String("-W-P-SSSSS-of-NNNNN")
  def getShardTemplate: String
  def setShardTemplate(value: String): Unit

  @Description("The suffix of the filenames written out")
  @Default.String(".txt")
  def getOutputFilenameSuffix: String
  def setOutputFilenameSuffix(value: String): Unit

  @Description("The window duration in minutes, defaults to 5")
  @Default.Integer(5)
  def getWindowDuration: Int
  def setWindowDuration(value: Int): Unit

  @Description("The compression used (gzip, bz2 or none), bz2 can't be loaded into BigQuery")
  @Default.String("none")
  def getCompression: String
  def setCompression(value: String): Unit

  @Description("The maximum number of output shards produced when writing")
  @Default.Integer(1)
  def getNumShards: Int
  def setNumShards(value: Int): Unit
}

and here's how I launch the template:

   gcloud dataflow jobs run storage \
     --gcs-location gs://bucket/templates/Storage \
     --parameters runner=DataflowRunner,project=project,streaming=true,inputSubscription=projects/project/subscriptions/sub,outputDirectory=gs://bucket/

Here's how I launch the job without the template:

./storage \
  --runner=DataFlowRunner \
  --project=project \
  --streaming=true \
  --gcpTempLocation=gs://tmp-bucket/ \
  --inputSubscription=projects/project/subscriptions/sub  \
  --outputDirectory=gs://bucket/
1
Have you tried changing to inputSubscription=sub instead of inputSubscription=projects/project/subscriptions/sub? - John Hanley
I suspect that this can be because, for template options to be resolved at runtime, you need to use ValueProviders (docs). Using the Console UI you can select a specific job and find the pipeline options in the right-side bar. Is inputSubscription correctly populated for the templated job? - Guillem Xercavins
Yes unfortunately the options are correctly populated in the right-side bar. All the defaults are being correctly overwritten. Unfortunately I'd like to avoid ValueProvider because they're not supported in Scio. - BenFradet
Although you might very well be onto something as when I hover over the params in the console for the working job I get my full package (e.g. com.package.Options.inputSubscription) whereas when I hover over the params for the job not working I only get the param name (e.g. inputSubscription). - BenFradet
You were right, ValueProviders are mandatory for the template to work. - BenFradet

1 Answers

1
votes

As @GuillemXercavins comment stated, the parameters have to use the ValueProvider interface as their type. This will allow the pipeline options to be set or used at runtime, which is what caused the issue.

It's worth pointing out, as you already did in a comment, that ValueProvider seems to be unsupported in Scio.


Edit:

Scio example provided by @BenFradet in comment below.