2
votes

The invoke method of sink seems no way to make async io? e.g. returns Future?

For example, the redis connector uses jedis lib to execute redis command synchronously:

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

Then it will block the task thread of flink waiting the network response from redis server per command?! Is it possible for other operators running in the same thread with sink? If so, then it would block them too?

I know flink has asyncio api, but it seems not for used by sink impl?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

1
Jedis has no async interfaceDexter
You can write your own connector impl which uses "RichAsyncFunction"Dexter
@Dexter could you give a simple example?kingluo

1 Answers

2
votes

As @Dexter mentioned, you can use RichAsyncFunction. Here is an sample code(may need further update to make it work ;)

    AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() {
        transient private RedisClient client;
        transient private RedisAsyncCommands<String, String> commands;
        transient private ExecutorService executor;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            client = RedisClient.create("redis://localhost");
            commands = client.connect().async();
            executor = Executors.newFixedThreadPool(10);
        }

        @Override
        public void close() throws Exception {
            // shut down the connection and thread pool.
            client.shutdown();
            executor.shutdown();

            super.close();
        }

        public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception {
            // eg.g get something from redis in async
            final RedisFuture<String> future = commands.get("key");
            future.thenAccept(new Consumer<String>() {
                @Override
                public void accept(String value) {
                     collector.collect(Collections.singletonList(future.get()));
                }
            });
        }
    }, 1000, TimeUnit.MILLISECONDS);