1
votes

After starting a Python Table API Job that involves user defined functions (UDF) by submitting it to a local cluster, it crashes with a py4j.protocol.Py4JJavaError caused by

java.util.ServiceConfigurationError: org.apache.beam.sdk.options.PipelineOptionsRegistrar: org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype.

I am aware that this is a bug concerning the dependencies on the lib path/classloading. I have already tried to follow all instructions at the following link: https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html

I have tried extensively different configurations with the classloader.parent-first-patterns-additional config option. Different entries with org.apache.beam.sdk.[...] have led to different, additional error messages.

The following dependencies, which refer to apache beam, are on the lib path:

  • beam-model-fn-execution-2.20.jar
  • beam-model-job-management-2.20.jar
  • beam-model-pipeline-2.20.jar
  • beam-runners-core-construction-java-2.20.jar
  • beam-runners-java-fn-execution-2.20.jar
  • beam-sdks-java-core-2.20.jar
  • beam-sdks-java-fn-execution-2.20.jar
  • beam-vendor-grpc-1_21_0-0.1.jar
  • beam-vendor-grpc-1_26_0.0.3.jar
  • beam-vendor-guava-26_0-jre-0.1.jar
  • beam-vendor-sdks-java-extensions-protobuf-2.20.jar

I can also rule out that it is due to my code, as I have tested the following sample code of the project website: https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")

When executing this code, the same error message appears.

Does anyone have a description of a configuration of a Flink cluster that can run Python Table API jobs with UDF? Many thanks for all tips in advance!

1

1 Answers

0
votes

The problem is solved by the new version 1.10.1 of Apache Flink. Executing the sample script shown in the question is now possible via the binaries with the command run -py path/to/script without any problems.

As for the dependencies, they are already included in the already delivered flink_table_x.xx-1.10.1.jar. So no further dependencies need to be added to the lib-path, which was done in the question by the debugging/configuration attempt.