I'm trying to send some data from a DataSet into elasticsearch using the new elasticsearch connector but I can't find any resources besides the ones for datastream structure here :
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html
My Dataset is a dataset of row (from a sql query), this is the content :
199947,6
199958,3
199964,2
199985,2
I've created a static nested class which implements ElasticsearchSinkFunction :
public static class NumberOfTransactionsByBlocks implements ElasticsearchSinkFunction<Row> {
public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
public IndexRequest createIndexRequest(Row element) {
Map<String, String> json = new HashMap<>();
json.put("block_number", element.getField(0).toString());
json.put("numberOfTransactions", element.getField(1).toString());
return Requests.indexRequest()
.index("nbOfTransactionsByBlocks")
.type("count-transactions")
.source(json);
}
}
And then my problem is that I don't know how to send an instance of my inner class...
DataSet<Row> data = tableEnv.toDataSet(sqlResult, Row.class);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1"); // flush inserts after every event
config.put("cluster.name", "elasticsearch"); // default cluster name
data.output(new ElasticsearchSink<>(config, httpHosts, new NumberOfTransactionsByBlocks()));
I have an error when I instantiate ElasticsearchSink it says :
cannot infer arguments
But when I do specify the type (Row) it says :
ElasticsearchSink(java.util.Map, java.util.List, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction, org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler, org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory)' has private access in 'org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink'
Am I doing something wrong ?