3
votes

Is there anyway to use a python user defined function within a Java Flink Job or anyway to communicate for example the result of a transformation done by flink with java with a python user defined function to apply some machine learning things:

I know that from pyFlink you can do something like this:

table_env.register_java_function("hash_code", "my.java.function.HashCode")

But I need to do something like that but add the python function from java, or how can I pass the result of a java transformation to a Python UDF Flink job directly?

I hope these questions are not to crazy, but I need to know if exist somehow to communicate Flink DataStream API with Python Table API having Java as main language? this means that from Java I need to do: Source -> Transformations -> Sink, but some of these transformations can trigger a Python function or a Python function will be waiting for some Java transformation to finish to do something with the Stream result.

I hope someone understand what I'm trying to do here.

Kind regards!

2

2 Answers

1
votes

Support for Python UDFs (user defined functions) was added in Flink 1.10 -- see PyFlink: Introducing Python Support for UDFs in Flink's Table API. For example, you can do this:

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
table_env.register_function("add", add)
my_table.select("add(a, b)")

For more examples, etc, see the blog post linked above, or the stable documentation.

In Flink 1.11 (release expected next week), support has been added for vectorized Python UDFs, bringing interoperability with Pandas, Numpy, etc. This release also includes support for Python UDFs in SQL DDL, and in the SQL client. For documentation, see the master docs.

It sounds like you want to call out to Python from Java. The Stateful Functions API supports this more completely -- see remote functions. But to call out to Python from the Java DataStream API, I think your only option is to use the SQL DDL support added in Flink 1.11. See FLIP-106 and the docs.

FLIP-106 has this example:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

which you should be able to convert to use the DataStream API instead.

1
votes

Example of this integration: This dependency is needed in your pom.xml, assuming that Flink 1.11 is the current version.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.11.2</version>
  <scope>provided</scope>
</dependency>

Create the Environments:

private StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

private StreamTableEnvironment tableEnv = getTableAPIEnv(env);

/*this SingleOutputStreamOperator will contains the result of the consumption from the  defined source*/
private SingleOutputStreamOperator<Event> stream; 


public static StreamTableEnvironment getTableAPIEnv(StreamExecutionEnvironment env) {
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.getConfig().getConfiguration().setString("python.files", path/function.py);
        tableEnv.getConfig().getConfiguration().setString("python.client.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("python.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("taskmanager.memory.task.off-heap.size", "79mb");
/*pass here the function.py and the name of the function into the python script*/
        tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'function.FunctionName' LANGUAGE PYTHON");
        return tableEnv;
    }

Start with the transformations that you want to do, for example:

SingleOutputStreamOperator<EventProfile> profiles = createUserProfile(stream.keyBy(k -> k.id));

/*The result of that ProcessFunction `createUserProfile()` will be sent into the Python function to update some values of the profile and return them back into a defined function in Flink with Java: map function for example*/
profiles = turnIntoTable(profiles).map((MapFunction<Row, EventProfile>) x -> {
  /*you custom code here to do the mapping*/
});
profiles.addSink(new yourCustomSinkFunction());

/*this function will process the Event and create the EventProfile class for this example but you can also use another operators (map, flatMap, etc)*/
 private SingleOutputStreamOperator<EventProfile> createUserProfile(KeyedStream<Event, String> stream) {
        return stream.process(new UserProfileProcessFunction());
    }


/*This function will receive a SingleOutputStreamOperator and sent each record to the Python function trough the TableAPI and returns a Row of String(you can change the Row type) that will be mapped back into EventProfile class*/
@FunctionHint(output = @DataTypeHint("ROW<a STRING>"))
private DataStream<Row> turnIntoTable(SingleOutputStreamOperator<EventProfile> rowInput) {
        Table events = tableEnv.fromDataStream(rowInput,
                $("id"), $("noOfHits"), $("timestamp"))
                .select("FunctionName(id, noOfHits, timestamp)");
        return tableEnv.toAppendStream(events, Row.class);
    }

And finally

env.execute("Job Name");

An example of the python function called FunctionName into the function.py script:

@udf(
    input_types=[
        DataTypes.STRING(), DataTypes.INT(), DataTypes.TIMESTAMP(precision=3)
    ],
    result_type=DataTypes.STRING()
)
def FunctionName(id, noOfHits, timestamp):
    # function code here
    return f"{id}|{noOfHits}|{timestamp}"