0
votes
I am using concurrend append method from the class Core in Azure to store data to Azure Data lake.Below is the code and the exception which I got.I am getting this exception rarely not always.Could anyone guide me?...





public void invoke(String value)  {
        BitfinexSingletonClass obj = null;
        try {
            obj = BitfinexSingletonClass.getInstance();
        } catch (IOException e1) {
            slf4jLogger.info(e1.getMessage());
        }
        ADLStoreClient client = obj.getADLStoreClient();
        byte[] myBuffer = (value + "\n").getBytes();

        RequestOptions opts = new RequestOptions();

        opts.retryPolicy = new ExponentialBackoffPolicy();

        OperationResponse resp = new OperationResponse();
        slf4jLogger.info("" + value);
        slf4jLogger
                .info("...............Writing.........above......BITFINEX_DSHBTC_ORDER..Data............................ToADLake............");
        Core.concurrentAppend(BITFINEX_DSHBTC_ORDER, myBuffer, 0, myBuffer.length, true, client, opts, resp);
        slf4jLogger.info("...............BITFINEX_DSHBTC_ORDER...Data...Successfully....written.....to...AzureDataLake............");
        if (!resp.successful) {
            try {
                throw client.getExceptionFromResponse(resp, "BITFINEX_DSHBTC_ORDER data is not written to ADL");
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

com.microsoft.azure.datalake.store.ADLException: Operation CONCURRENTAPPEND failed with exception java.net.SocketTimeoutException : Read timed out Last encountered exception thrown after 5 tries [java.net.UnknownHostException,java.net.UnknownHostException,java.net.UnknownHostException,java.net.SocketTimeoutException,java.net.SocketTimeoutException] at com.microsoft.azure.datalake.store.ADLStoreClient.getExceptionFromResponse(ADLStoreClient.java:1124) at co.biz.yobit.sink.YobitLtcbtcTickerADLSink.invoke(YobitLtcbtcTickerADLSink.java:41) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) at sun.security.ssl.InputRecord.read(InputRecord.java:503) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678) at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1569) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474) at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) at com.microsoft.azure.datalake.store.HttpTransport.makeSingleCall(HttpTransport.java:292) at com.microsoft.azure.datalake.store.HttpTransport.makeCall(HttpTransport.java:91) at com.microsoft.azure.datalake.store.Core.concurrentAppend(Core.java:210) at co.biz.yobit.sink.YobitLtcbtcTickerADLSink.invoke(YobitLtcbtcTickerADLSink.java:37) ... 6 more

1
Maybe you can use a debugger and figure out which host is unknown. For me it looks like a wrong configuration or network address/port issue. - twalthr
The above errors are typically a result of unreliable network conditions between the host that is running the code and Azure Data Lake Store. Where are you running the code? Is it running on an Azure VM or outside of Azure? - Amit Kulkarni
@ Amit Kulkarni. I am running this code on flink standalone cluster in my company local VM. - Dhinesh
@Dhinesh where is the "local VM" running? Is it in the Azure region as your ADLS account or in your company's datacenter? - Amit Kulkarni
@ Amit Kulkarni. My "local VM" linux machine is running my company server in Bangalore. My ADLS account region is East US 2. - Dhinesh

1 Answers

2
votes

The above errors are typically a result of unreliable network conditions between the host that is running the code and Azure Data Lake Store. As confirmed in comments, the hosts are running across geographies connecting over the WAN. Therefore, these errors are to be expected and you should retry if you see these errors

It is recommended that the Flink cluster run on VMs in the same region as Azure Data Lake Store. In that configuration, you will not see these network errors.