1
votes

I wrote an example based on the following code https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py

I am getting an error message

/usr/local/lib/python3.6/dist-packages/apache_beam/typehints/schemas.py in typing_to_runner_api(type_) 177 array_type=schema_pb2.ArrayType(element_type=element_type)) 178 --> 179 raise ValueError("Unsupported type: %s" % type_) 180 181

ValueError: Unsupported type: Any

The corresponding part of the code is

   mean_open_close = (
   csv_lines |beam.Map(
       lambda x: beam.Row(
           element_date=x['Date'],
           element_open=x['Open'],
           element_close=x['Close']))
   | SqlTransform(
       """
        SELECT
          element_date,
          AVERAGE(element_open) AS average_open,
          AVERAGE(element_close) AS average_close
        FROM PCOLLECTION""")

)

1

1 Answers

0
votes

Without your actual code, it's unclear what the issue is (as the posted example works just fine), but going on a hunch I might guess that you're writing

beam.Map(lambda x: beam.Row(attr=expr, ...))

for some expr for which Beam is not able to infer the type. You could explicitly write this as

beam.Map(lambda x: beam.Row(attr=type(expr), ...))