0
votes

I'm trying to run an Apache Beam job based on Tensorflow Transform on Dataflow but its killed. Someone has experienced that behaviour? This is a simple example with DirectRunner, that runs ok on my local but fails on Dataflow (I change the runner properly):

import os
import csv
import datetime
import numpy as np

import tensorflow as tf
import tensorflow_transform as tft

from apache_beam.io import textio
from apache_beam.io import tfrecordio

from tensorflow_transform.beam import impl as beam_impl
from tensorflow_transform.beam import tft_beam_io 
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

import apache_beam as beam


NUMERIC_FEATURE_KEYS = ['feature_'+str(i) for i in range(2000)]


def _create_raw_metadata():
    column_schemas = {}
    for key in NUMERIC_FEATURE_KEYS:
        column_schemas[key] = dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())

    raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(column_schemas))

    return raw_data_metadata


def preprocessing_fn(inputs):
    outputs={}

    for key in NUMERIC_FEATURE_KEYS:
        outputs[key] = tft.scale_to_0_1(inputs[key])

    return outputs


def main():

    output_dir = '/tmp/tmp-folder-{}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

    RUNNER = 'DirectRunner'

    with beam.Pipeline(RUNNER) as p:
        with beam_impl.Context(temp_dir=output_dir):

            raw_data_metadata = _create_raw_metadata()
            _ = (raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(os.path.join(output_dir, 'rawdata_metadata'), pipeline=p))

            m = numpy_dataset = np.random.rand(100,2000)*100
            raw_data = (p
                    | 'CreateTestDataset' >> beam.Create([dict(zip(NUMERIC_FEATURE_KEYS, m[i,:])) for i in range(m.shape[0])]))

            raw_dataset = (raw_data, raw_data_metadata)

            transform_fn = (raw_dataset | 'Analyze' >> beam_impl.AnalyzeDataset(preprocessing_fn))
            _ = (transform_fn | 'WriteTransformFn' >> tft_beam_io.WriteTransformFn(output_dir))

            (transformed_data, transformed_metadata) = ((raw_dataset, transform_fn) | 'Transform' >> beam_impl.TransformDataset())

            transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
            _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(os.path.join(output_dir, 'train'), file_name_suffix='.gz', coder=transformed_data_coder)

if __name__ == '__main__':
  main()

Also, my production code (not shown) fail with the message: The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.

Any hint?

1
Dataflow only supports pipeline graphs that are less than (I believe 10MB, or 100MB). This error means that your graph is too large. Let me see if we can find a workaround... - Pablo
I ran across the same "too large" error using the Template approach. And the limit is 10MB. Changing to the Traditional approach got past that particular error and then another limit was hit. This time a 20MB limit. It was due to some other .json file getting too big. - ptsw

1 Answers

0
votes

The restriction on the pipeline description size is documented here: https://cloud.google.com/dataflow/quotas#limits

There is a way around that, instead of creating stages for each tensor that goes into tft.scale_to_0_1 we could fuse them by first stacking them together, and then passing them into tft.scale_to_0_1 with 'elementwise=True'.

The result will be the same, because the min and max are computed per 'column' instead of across the whole tensor.

This would look something like this:

stacked = tf.stack([inputs[key] for key in NUMERIC_FEATURE_KEYS], axis=1)
scaled_stacked = tft.scale_to_0_1(stacked, elementwise=True)
for key, tensor in zip(NUMERIC_FEATURE_KEYS, tf.unstack(scaled_stacked, axis=1)):
  outputs[key] = tensor