0
votes

I'm trying to send some data from a DataSet into elasticsearch using the new elasticsearch connector but I can't find any resources besides the ones for datastream structure here :

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html

My Dataset is a dataset of row (from a sql query), this is the content :

199947,6
199958,3
199964,2
199985,2

I've created a static nested class which implements ElasticsearchSinkFunction :

public static class NumberOfTransactionsByBlocks implements ElasticsearchSinkFunction<Row> {

    public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));

    }

    public IndexRequest createIndexRequest(Row element) {
        Map<String, String> json = new HashMap<>();
        json.put("block_number", element.getField(0).toString());
        json.put("numberOfTransactions", element.getField(1).toString());

        return Requests.indexRequest()
                .index("nbOfTransactionsByBlocks")
                .type("count-transactions")
                .source(json);
    }
}

And then my problem is that I don't know how to send an instance of my inner class...

DataSet<Row> data = tableEnv.toDataSet(sqlResult, Row.class);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");   // flush inserts after every event
config.put("cluster.name", "elasticsearch"); // default cluster name


data.output(new ElasticsearchSink<>(config, httpHosts, new NumberOfTransactionsByBlocks()));

I have an error when I instantiate ElasticsearchSink it says :

cannot infer arguments

But when I do specify the type (Row) it says :

ElasticsearchSink(java.util.Map, java.util.List, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction, org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler, org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory)' has private access in 'org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink'

Am I doing something wrong ?

1

1 Answers

0
votes

There are currently (1.6.0) four different connectors provided by Flink for ElasticSearch.

  • v1.x: flink-connector-elasticsearch_2.11
  • v2.x: flink-connector-elasticsearch2_2.11
  • v5.x: flink-connector-elasticsearch5_2.11
  • v6.x: flink-connector-elasticsearch6_2.11

Make sure you include the correct maven dependency to your project.

...has private access in org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink

Now, guessing from the trace you shared, it looks like you are using the dependency for v6.x. Looking at the source, it suggests that they have moved the constructor to private and added a Builder [commit]

So, to add an ElasticsearchSink, you need something like:

data.output(
  new ElasticsearchSink.Builder<>(httpHosts, new NumberOfTransactionsByBlocks())
    .setBulkFlushMaxActions(1)
    .build());

Also, the import would be

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;