3
votes

I am using RestHighLevelClient version 7.2 to connect to the ElasticSearch cluster version 7.2. My cluster has 3 Master nodes and 2 data nodes. Data node memory config: 2 core and 8 GB. I have used to below code in my spring boot project to create RestHighLevelClient instance.

 @Bean(destroyMethod = "close")
    @Qualifier("readClient")
    public RestHighLevelClient readClient(){

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(elasticUser, elasticPass));

        RestClientBuilder builder = RestClient.builder(new HttpHost(elasticHost, elasticPort))
                .setHttpClientConfigCallback(httpClientBuilder ->httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build()));

        builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(30000).setSocketTimeout(60000)
                );

        RestHighLevelClient restClient = new RestHighLevelClient(builder);
        return restClient;
    }

RestHighLevelClient is a singleton bean. Intermittently I am getting SocketTimeoutException with both GET and PUT request. The index size is around 50 MB. I have tried increasing the socket timeout value, but still, I receive the same error. Am I missing some configuration? Any help would be appreciated.

2

2 Answers

3
votes

I got the issue just wanted to share so that it can help others. I was using Load Balancer to connect to the ElasticSerach Cluster. As you can see from my RestClientBuilder code that I was using only the loadbalancer host and port. Although I have multiple master node, still RestClient was not retrying my request in case of connection timeout.

RestClientBuilder builder = RestClient.builder(new HttpHost(elasticHost, elasticPort))
                .setHttpClientConfigCallback(httpClientBuilder ->httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build()));

According to the RestClient code if we use a single host then it won't retry in case of any connection issue. So I changed my code as below and it started working.

RestClientBuilder builder = RestClient.builder(new HttpHost(elasticHost, 9200),new HttpHost(elasticHost, 9201))).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));

For complete RestClient code please refer https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

Retry code block in RestClient

private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple,
                                    final InternalRequest request,
                                    Exception previousException) throws IOException {
        RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
        HttpResponse httpResponse;
        try {
            httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();
        } catch(Exception e) {
            RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
            onFailure(context.node);
            Exception cause = extractAndWrapCause(e);
            addSuppressedException(previousException, cause);
            if (nodeTuple.nodes.hasNext()) {
                return performRequest(nodeTuple, request, cause);
            }
            if (cause instanceof IOException) {
                throw (IOException) cause;
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
            }
            throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause);
        }
        ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
        if (responseOrResponseException.responseException == null) {
            return responseOrResponseException.response;
        }
        addSuppressedException(previousException, responseOrResponseException.responseException);
        if (nodeTuple.nodes.hasNext()) {
            return performRequest(nodeTuple, request, responseOrResponseException.responseException);
        }
        throw responseOrResponseException.responseException;
    }
0
votes

I'm facing the same issue, and seeing this I realized that the retry is happening on my side too in each host (I have 3 host and the exception happens in 3 threads). I wanted to post it since you might face the same issue or someone else might come to this post because of the same SocketConnection Exception.

Searching the official docs, the HighLevelRestClient uses under the hood the RestClient, and the RestClient uses CloseableHttpAsyncClient which have a connection pool. ElasticSearch specifies that you should close the connection once that you are done, (which sounds ambiguous the definition of "done" in an application), but in general in internet I have found that you should close it when the application is closing or ending, rather than when you finished querying.

Now on the official documentation of apache they have an example to handle the connection pool, which i'm trying to follow, I'll try to replicate the scenario and will post if that fixes my issue, the code can be found here:

https://hc.apache.org/httpcomponents-asyncclient-dev/httpasyncclient/examples/org/apache/http/examples/nio/client/AsyncClientEvictExpiredConnections.java

This is what i have so far:

@Bean(name = "RestHighLevelClientWithCredentials", destroyMethod = "close")
public RestHighLevelClient elasticsearchClient(ElasticSearchClientConfiguration elasticSearchClientConfiguration,
                                               RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback) {
    return new RestHighLevelClient(
            RestClient
                    .builder(getElasticSearchHosts(elasticSearchClientConfiguration))
                    .setHttpClientConfigCallback(httpClientConfigCallback)
    );
}

@Bean
@RefreshScope
public RestClientBuilder.HttpClientConfigCallback getHttpClientConfigCallback(
        PoolingNHttpClientConnectionManager poolingNHttpClientConnectionManager,
        CredentialsProvider credentialsProvider
) {
    return httpAsyncClientBuilder -> {
        httpAsyncClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
        httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        httpAsyncClientBuilder.setConnectionManager(poolingNHttpClientConnectionManager);
        return httpAsyncClientBuilder;
    };
}

public class ElasticSearchClientManager {

private ElasticSearchClientManager.IdleConnectionEvictor idleConnectionEvictor;

/**
 * Custom client connection manager to create a connection watcher
 *
 * @param elasticSearchClientConfiguration elasticSearchClientConfiguration
 * @return PoolingNHttpClientConnectionManager
 */
@Bean
@RefreshScope
public PoolingNHttpClientConnectionManager getPoolingNHttpClientConnectionManager(
        ElasticSearchClientConfiguration elasticSearchClientConfiguration
) {
    try {
        SSLIOSessionStrategy sslSessionStrategy = new SSLIOSessionStrategy(getTrustAllSSLContext());
        Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
                .register("http", NoopIOSessionStrategy.INSTANCE)
                .register("https", sslSessionStrategy)
                .build();
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
        PoolingNHttpClientConnectionManager poolingNHttpClientConnectionManager =
                new PoolingNHttpClientConnectionManager(ioReactor, sessionStrategyRegistry);
        idleConnectionEvictor = new ElasticSearchClientManager.IdleConnectionEvictor(poolingNHttpClientConnectionManager,
                elasticSearchClientConfiguration);
        idleConnectionEvictor.start();
        return poolingNHttpClientConnectionManager;
    } catch (IOReactorException e) {
        throw new RuntimeException("Failed to create a watcher for the connection pool");
    }
}

private SSLContext getTrustAllSSLContext() {
    try {
        return new SSLContextBuilder()
                .loadTrustMaterial(null, (x509Certificates, string) -> true)
                .build();
    } catch (Exception e) {
        throw new RuntimeException("Failed to create SSL Context with open certificate", e);
    }
}

public IdleConnectionEvictor.State state() {
    return idleConnectionEvictor.evictorState;
}

@PreDestroy
private void finishManager() {
    idleConnectionEvictor.shutdown();
}


public static class IdleConnectionEvictor extends Thread {

    private final NHttpClientConnectionManager nhttpClientConnectionManager;
    private final ElasticSearchClientConfiguration elasticSearchClientConfiguration;

    @Getter
    private State evictorState;
    private volatile boolean shutdown;

    public IdleConnectionEvictor(NHttpClientConnectionManager nhttpClientConnectionManager,
                                 ElasticSearchClientConfiguration elasticSearchClientConfiguration) {
        super();
        this.nhttpClientConnectionManager = nhttpClientConnectionManager;
        this.elasticSearchClientConfiguration = elasticSearchClientConfiguration;
    }

    @Override
    public void run() {
        try {
            while (!shutdown) {
                synchronized (this) {
                    wait(elasticSearchClientConfiguration.getExpiredConnectionsCheckTime());
                    // Close expired connections
                    nhttpClientConnectionManager.closeExpiredConnections();
                    // Optionally, close connections
                    // that have been idle longer than 5 sec
                    nhttpClientConnectionManager.closeIdleConnections(elasticSearchClientConfiguration.getMaxTimeIdleConnections(),
                            TimeUnit.SECONDS);
                    this.evictorState = State.RUNNING;
                }
            }
        } catch (InterruptedException ex) {
            this.evictorState = State.NOT_RUNNING;
        }
    }

    private void shutdown() {
        shutdown = true;
        synchronized (this) {
            notifyAll();
        }
    }

    public enum State {
        RUNNING,
        NOT_RUNNING
    }
}

}