0
votes

Apache Flink version is 1.1.3 and elasticsearch is 5.1.1.

documents of flink explain only for elasticsearch 2.x API (flink-connector-elasticsearch2_2.1.1) https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/elasticsearch2.html

is there no flink connector API for elasticsearch 5.x?

I tried to use this version for elasticsearch 5.x but I meet some error like below

Flink exception

01/03/2017 20:01:21 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)

elasticsearch log

[2017-01-03T20:01:21,642][WARN ][o.e.h.n.Netty4HttpServerTransport] [7CTs2-R] caught exception while handling client http traffic, closing connection [id: 0xbce51ef2, L:/127.0.0.1:9200 - R:/127.0.0.1:58429]
java.io.IOException: The current connection has been interrupted by a remote host
    at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[?:?]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[?:?]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:?]
    at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:?]
    at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:261) ~[netty-buffer-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) ~[netty-buffer-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:366) ~[netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:118) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:536) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:490) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.6.Final.jar:4.1.6.Final]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
2

2 Answers

0
votes

Apache Flink 1.1.3 does not include a connector for Elasticsearch 5.x.

There is some work in progress (JIRA, Github PR) for such a connector but it has not been added to the Flink codebase yet.

You could try to built the connector from the repository of the pull request author.

0
votes

You should be able to use the below dependency and get it to work.

Note that the flink release version doesnt seem to be working for the elasticsearch5 dependency atleast for me on my Mac. so if you can downgrade it to 1.3-SNAPSHOT it should work.

I am able to get it working and posting event to the Elastic Search 5.4.

Change the following

in the pom.xml
1.3-SNAPSHOT

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
        <version>${flink.version}</version>
    </dependency>

In the Java code where you are using elasticsearch2 change it to 5 as below

import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;

In the class where it is extending the ElasticsearchSinkFunction add this method (like in the PopularPlaceInserter) along with the existing process method. Make sure you change the Index and mapping name type for the elasticsearch and you should be able to run the program.

    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("nyc-idx")
                .type("popular-locations")
                .source(json);
    }