4
votes

My question is, behind the scene, for element-wise Beam DoFn (ParDo), how does the Cloud Dataflow parallel workload? For example, in my ParDO, I send out one http request to an external server for one element. And I use 30 workers, each has 4vCPU.

  1. Does that mean on each worker, there will be 4 threads at maximum?
  2. Does that mean from each worker, only 4 http connections are necessary or can be established if I keep them alive to get the best performance?
  3. How can I adjust the level of parallelism other than using more cores or more workers?
  4. with my current setting (30*4vCPU worker), I can establish around 120 http connections on the http server. But both server and worker has very low resource usage. basically I want to make them work much harder by sending out more requests out per second. What should I do...

Code Snippet to illustrate my work:

public class NewCallServerDoFn extends DoFn<PreparedRequest,KV<PreparedRequest,String>> {


private static final Logger Logger = LoggerFactory.getLogger(ProcessReponseDoFn.class);

private static PoolingHttpClientConnectionManager _ConnManager = null;
private static CloseableHttpClient _HttpClient = null;
private static HttpRequestRetryHandler _RetryHandler = null;
private static  String[] _MapServers = MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.server_host").split(",");

@Setup
public void setupHttpClient(){

    Logger.info("Setting up HttpClient");

   //Question: the value of maxConnection below is actually 10, but with 30 worker machines, I can only see 115 TCP connections established on the server side. So this setting doesn't really take effect as I expected.....

    int maxConnection = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.max_connection");
    int timeout = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.timeout");

    _ConnManager = new PoolingHttpClientConnectionManager();

    for (String mapServer : _MapServers) {
        HttpHost serverHost = new HttpHost(mapServer,80);
        _ConnManager.setMaxPerRoute(new HttpRoute(serverHost),maxConnection);
    }

    // config timeout
    RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(timeout)
            .setConnectionRequestTimeout(timeout)
            .setSocketTimeout(timeout).build();

    // config retry
    _RetryHandler = new HttpRequestRetryHandler() {

        public boolean retryRequest(
                IOException exception,
                int executionCount,
                HttpContext context) {

            Logger.info(exception.toString());
            Logger.info("try request: " + executionCount);

            if (executionCount >= 5) {
                // Do not retry if over max retry count
                return false;
            }
            if (exception instanceof InterruptedIOException) {
                // Timeout
                return false;
            }
            if (exception instanceof UnknownHostException) {
                // Unknown host
                return false;
            }
            if (exception instanceof ConnectTimeoutException) {
                // Connection refused
                return false;
            }
            if (exception instanceof SSLException) {
                // SSL handshake exception
                return false;
            }
            return true;
        }

    };

    _HttpClient = HttpClients.custom()
                            .setConnectionManager(_ConnManager)
                            .setDefaultRequestConfig(requestConfig)
                            .setRetryHandler(_RetryHandler)
                            .build();

    Logger.info("Setting up HttpClient is done.");

}

@Teardown
public void tearDown(){
    Logger.info("Tearing down HttpClient and Connection Manager.");
    try {
        _HttpClient.close();
        _ConnManager.close();
    }catch (Exception e){
        Logger.warn(e.toString());
    }
    Logger.info("HttpClient and Connection Manager have been teared down.");
}




@ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    String response="{\"my_error\":\"failed to get response from map server with retries\"}";


    String chosenServer = _MapServers[request.getHardwareId() % _MapServers.length];

    String parameter;
    try {
        parameter = URLEncoder.encode(request.getRequest(),"UTF-8");
    } catch (UnsupportedEncodingException e) {
        Logger.error(e.toString());

        return;
    }

    StringBuilder sb = new StringBuilder().append(MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.api_path"))
            .append("?coordinates=")
            .append(parameter);

    HttpGet getRequest = new HttpGet(sb.toString());
    HttpHost host = new HttpHost(chosenServer,80,"http");
    CloseableHttpResponse httpRes;

    try {
        httpRes = _HttpClient.execute(host,getRequest);
        HttpEntity entity = httpRes.getEntity();
        if(entity != null){
            try
            {
                response = EntityUtils.toString(entity);
            }finally{
                EntityUtils.consume(entity);
                httpRes.close();
            }
        }
    }catch(Exception e){
        Logger.warn("failed by get response from map server with retries for " + request.getRequest());
    }

    c.output(KV.of(request, response));

}
}
1

1 Answers

2
votes
  1. Yes, based on this answer.
  2. No, you can establish more connections. Based on my answer, you can use a async http client to have more concurrent requests. As this answer also describes, you need to collect the results from these asynchronous calls and output it synchronously in any @ProcessElement or @FinishBundle.
  3. See 2.
  4. Since your resource usage is low, it indicates that the worker spends most of its time waiting for a response. I think with the described approach above, you can utilize your resources far better and you can achieve the same performance with far less workers.