0
votes

I have started learning stream processing very recently and am trying my hand at Apache Flink. I'm trying to write a job that reads events from a Kafka topic, possibly performs some stateless chained transformations and makes a REST call to another application to POST each transformed event. For example, my main method could look like this -

public class KafkaSourceToRestSinkJob {
    public static void main(String[] args) {
        String configPath = args[0];
        //Read configuration for the job (like kafka properties, rest uri for sink, possibly operators to invoke)
        ...
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProps));
        dataStream.addSink(new RestSinkFunction<>()); //Custom sink function implementing org.apache.flink.streaming.api.functions.sink.SinkFunction
        //Chain some operators depending on some parameters in the config file
        ...
        env.execute("Confused Job");
    }
}

My aim is to have a common jar artifact for multiple jobs with the same type of source and sink. If I need a job to perform transformations A, B & C (implementations will be present in the jar), I can specify them in the config file and pass the path to the file in the program args.

Now here are my questions -

  1. Is it possible to dynamically invoke operators?
  2. I know that making a REST call in the sink may cause some unwanted latency, but in my application, it is tolerable. I also do not care about the response. Keeping this in mind, is there a reason why I should avoid a REST sink?
  3. Overall, am I going horribly wrong?

Thank you!

2
Are You sure this is the best solution ?? It seems a little hard to maintain in the long run.Dominik Wosiński

2 Answers

0
votes

You can't dynamically modify the topology of the job graph, but you can, for example, implement a flatmap operator that dynamically loads a class (specified in the config) and then uses it to transform the event stream.

As for the REST sink, if you need to guarantee exactly once semantics, end-to-end, then you'll need to carefully fit the sink in with Flink's checkpointing. The FlinkKafkaConsumer handles recovery by rewinding and replaying events since the last checkpoint. If you aren't careful, this will result in duplicate results being pushed to the REST sink during recovery. If the REST sink is only performing idempotent updates on the external system, then this is fine, but otherwise you'll need to make the REST sink stateful and transactional.

0
votes

I'd probably have a look at Flink SQL. You can define common source/sinks and then just pass an SQL query to Flink.

I had a similar setup in the past with Spark SQL and it worked fairly well. You don't need to invent your own specification language and it's easier to understand.