0
votes

I have StreamExecutionEnvironment job that consumes from kafka simple cql select queries. I try to handle this queries asynchronically using following code:

public class GenericCassandraReader extends RichAsyncFunction {

private static final Logger logger = LoggerFactory.getLogger(GenericCassandraReader.class);
private ExecutorService executorService;

private final Properties props;
private Session client;

public ExecutorService getExecutorService() {
    return executorService;
}

public GenericCassandraReader(Properties props, ExecutorService executorService) {
    super();
    this.props = props;
    this.executorService = executorService;
}

@Override
public void open(Configuration parameters) throws Exception {
    client = Cluster.builder().addContactPoint(props.getProperty("cqlHost"))
            .withPort(Integer.parseInt(props.getProperty("cqlPort"))).build()
            .connect(props.getProperty("keyspace"));

}

@Override
public void close() throws Exception {
    client.close();
    synchronized (GenericCassandraReader.class) {
        try {
            if (!getExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS)) {
                getExecutorService().shutdownNow();
            }
        } catch (InterruptedException e) {
            getExecutorService().shutdownNow();
        }
    }
}

@Override
public void asyncInvoke(final UserDefinedType input, final AsyncCollector<ResultSet> asyncCollector) throws Exception {
    getExecutorService().submit(new Runnable() {
        @Override
        public void run() {
            ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(input.query);

            Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {

                public void onSuccess(ResultSet resultSet) {
                    asyncCollector.collect(Collections.singleton(resultSet));
                }

                public void onFailure(Throwable t) {
                    asyncCollector.collect(t);
                }
            });
        }
    });
}

}

each response of this code provides Cassandra ResultSet with different amount of fields .

Any Ideas for handling Cassandra ResultSet in Flink or should I use another technics to reach my goal ?

Thanks for any help in advance!

1

1 Answers

0
votes

Cassandra ResultSet is not thread-safe. Better try to use Flink Cassandra connector. Or at least write your implementation in a similar way