1
votes

I have a job streaming using Apache Flink (flink version: 1.8.1) using scala. there are flow job requirements as follows: Kafka -> Write to Hbase -> Send to kafka again with a different topic

During the writing process to Hbase, there was a need to retrieve data from another table. To ensure that the data is not empty (NULL), the job must check repeatedly (within a certain time) if the data is empty.

is this possible with Flink? If yes, can you help provide examples for conditions similar to my needs?

Edit : I mean, with the problem that I described in the content, I thought about having to create some kind of job batch in the job streaming, but I couldn't find the right example for my case. So, is it possible to create a batch flink job in streaming flink job? If yes, can you help provide examples for conditions similar to my needs?

2
I think that the broadcast pattern applies to your use case (ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/…) although you have to change the ruleStateDescriptor to renew the state every X minutes. - Felipe
It's not clear how the title of your question ("Is it possible to create a batch flink job in streaming flink job?") relates to the rest of the content. - David Anderson

2 Answers

0
votes

With more recent versions of Flink you can do lookup queries (with a configurable cache) against HBase from the SQL/Table APIs. Your use case sounds like it might be easily implemented in this fashion. See the docs for more info.

0
votes

Just to clarify my comment I will post a sketch of what I was trying to suggest based on The Broadcast State Pattern. The link provides an example in Java, so I will follow it. In case you want in Scala it should not be too much different. You will likely have to implement the below code as it is explained on the link that I mentioned:

DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 );

I was suggesting that you can collect the stream ruleBroadcastStream in fixed intervals from the database or whatever is your store. Instead of getting:

// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

like the web page says. You will need to add a source where you can schedule it to run every X minutes.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
BroadcastStream<Rule> ruleBroadcastStream = env
             .addSource(new YourStreamSource())
             .broadcast(ruleStateDescriptor);
 
public class YourStreamSource extends RichSourceFunction<YourType> {
    private volatile boolean running = true;
    @Override
    public void run(SourceContext<YourType> ctx) throws Exception {
        while (running) {
            // TODO: yourData = FETCH DATA;
            ctx.collect(yourData);

            Thread.sleep("sleep for X minutes");
        }
    }
    @Override
    public void cancel() {
        this.running = false;
    }
}