1
votes

I am trying to build an Apache Beam pipeline in Python 3.7 with beam sdk version 2.20.0, the pipeline gets deployed on Dataflow successfully but does not seem to be doing anything. In the worker logs, I can see the following error message repeatedly reported

Error syncing pod xxxxxxxxxxx (), skipping: Failed to start container worker log

I have tried everything I could but this error is quite stubborn, my pipeline looks like this.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import DebugOptions

 options = PipelineOptions()

    options.view_as(GoogleCloudOptions).project = PROJECT
    options.view_as(GoogleCloudOptions).job_name = job_name
    options.view_as(GoogleCloudOptions).region = region
    options.view_as(GoogleCloudOptions).staging_location = staging_location
    options.view_as(GoogleCloudOptions).temp_location = temp_location

    options.view_as(WorkerOptions).zone = zone
    options.view_as(WorkerOptions).network = network
    options.view_as(WorkerOptions).subnetwork = sub_network
    options.view_as(WorkerOptions).use_public_ips = False

    options.view_as(StandardOptions).runner = 'DataflowRunner'
    options.view_as(StandardOptions).streaming = True

    options.view_as(SetupOptions).sdk_location = ''
    options.view_as(SetupOptions).save_main_session = True

    options.view_as(DebugOptions).experiments = []

    print('running pipeline...')

    with beam.Pipeline(options=options) as pipeline:
        (
                pipeline
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic=topic_name).with_output_types(bytes)
                | 'ProcessMessage' >> beam.ParDo(Split())
                | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_table_name,
                                                               schema=bq_schema,
                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
         )

    result = pipeline.run()

I have tried supplying a beam sdk 2.20.0.tar.gz from the compute instance using sdk_location parameter, that doesn't work either. I can't use sdk_location = default as that triggers a download from pypi.org. I am working in an offline environment and connectivity to internet is not an option. Any help would be highly appreciated.

The pipeline itself is deployed on a container and all libraries that go with apache beam 2.20.0 are specified in a requirements.txt file, docker image installs all the libraries.

1
Hello! 1. Could you please add a full code with all imports? 2. Please take a look for the official documentation about specifying subnetwork - have you specified it correctly?, configuring options and another SO threadaga
yes I have specified both network and sub-network correctly, the same network and sub-network work fine for default dataflow jobs.user12837160
So, to clarify, jobs work without 'sdk_location' flag but not with it ? What do you mean by "default dataflow jobs" ? The error probably means that Dataflow runtime environment does not have access to download the containers required to execute the job or that the containers are not available (if you are specifying a customer container).chamikara
by default dataflow jobs I mean default templates available in dataflow, I am using same network and sub-network values for my own template also, so there should be no problem with network settings. but yes there is no connectivity to internet, jobs created with default templates work fine but custom ones designed in Python fail.user12837160
Sounds like you are trying to specify a custom container to Dataflow. I don't think this is fully supported/documented yet. There's some information here stackoverflow.com/questions/44465818/…. But I suggest you contact Google Cloud Support if you need more specific information.chamikara

1 Answers

0
votes

TL;DR : Copy the Apache Beam SDK Archive into an accessible path and provide the path as a variable.

I was also struggling with this setup. Finally I found a solution - even if your question was raised quite some days ago, this answer might help someone else.

There are probably multiple ways to do that, but the following two are quite simple.

As a precondition you'll need to create the apache-beam-sdk source archive as following:

  1. Clone Apache Beam GitHub

  2. Switch to required tag eg. v2.28.0

  3. cd to beam/sdks/python

  4. Create tar.gz source archive of your required beam_sdk version like following:

    python setup.py sdist 
    
  5. Now you should have the source archive apache-beam-2.28.0.tar.gz in the path beam/sdks/python/dist/

Option 1 - Use Flex templates and copy Apache_Beam_SDK in Dockerfile
Documentation : Google Dataflow Documentation

  1. Create a Dockerfile --> you have to include this COPY utils/apache-beam-2.28.0.tar.gz /tmp, because this is going to be the path you can set in your SetupOptions.
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}

WORKDIR ${WORKDIR}

# Due to a change in the Apache Beam base image in version 2.24, you must to install
# libffi-dev manually as a dependency. For more information:
# https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891

# update used packages
RUN apt-get update && apt-get install -y \
    libffi-dev \
 && rm -rf /var/lib/apt/lists/*


COPY setup.py .
COPY main.py .

COPY path_to_beam_archive/apache-beam-2.28.0.tar.gz /tmp

ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"

RUN python -m pip install --user --upgrade pip setuptools wheel
  1. Set sdk_location to path you've copied the apache_beam_sdk.tar.gz to:
    options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
  1. Build the Docker image with Cloud Build
    gcloud builds submit --tag $TEMPLATE_IMAGE .
  2. Create a Flex template
gcloud dataflow flex-template build "gs://define-path-to-your-templates/your-flex-template-name.json" \
 --image=gcr.io/your-project-id/image-name:tag \
 --sdk-language=PYTHON \
 --metadata-file=metadata.json
  1. Run generated flex-template in your subnetwork (if required)
gcloud dataflow flex-template run "your-dataflow-job-name" \
--template-file-gcs-location="gs://define-path-to-your-templates/your-flex-template-name.json" \
--parameters staging_location="gs://your-bucket-path/staging/" \
--parameters temp_location="gs://your-bucket-path/temp/" \
--service-account-email="your-restricted-sa-dataflow@your-project-id.iam.gserviceaccount.com" \
--region="yourRegion" \
--max-workers=6 \
--subnetwork="https://www.googleapis.com/compute/v1/projects/your-project-id/regions/your-region/subnetworks/your-subnetwork" \
--disable-public-ips

Option 2 - Copy sdk_location from GCS
According Beam documentation you should be able to even provide directly a GCS / gs:// path for the Option sdk_location, but it didn't work for me. But the following should work:

  1. Upload previously generated archive to a bucket which you're able to access from your Dataflow Job you'd like to execute. Probably to something like gs://yourbucketname/beam_sdks/apache-beam-2.28.0.tar.gz
  2. Copy the apache-beam-sdk in your source code to eg. /tmp/apache-beam-2.28.0.tar.gz
# see: https://cloud.google.com/storage/docs/samples/storage-download-file
from google.cloud import storage

def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # bucket_name = "your-bucket-name"
    # source_blob_name = "storage-object-name"
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()
    bucket = storage_client.bucket("gs://your-bucket-name")

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob("gs://your-bucket-name/path/apache-beam-2.28.0.tar.gz")
    blob.download_to_filename("/tmp/apache-beam-2.28.0.tar.gz")

  1. Now you can set the sdk_location to the path you've downloaded the sdk archive.
options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
  1. Now your Pipeline should be able to run without internet breakout.