2
votes

I'm trying to make thousands of GET requests in the smallest amount of time possible. I need to do so in a scalable way: doubling the number of servers I use to make the requests should halve the time to complete for a fixed number of URLs.

I'm using Celery with the eventlet pool and RabbitMQ as the broker. I'm spawning one worker process on each worker server with --concurrency 100 and have a dedicated master server issuing tasks (the code below). I'm not getting the results I expect: the time to complete is not reduced at all when doubling the number of worker servers used.

It appears as though as I add more worker servers, the utilization of each worker goes down (as reported by Flower). For example, with 2 workers, throughout execution the number of active threads per worker hovers in the 80 to 90 range (as expected, since concurrency is 100). However, with 6 workers, the number of active threads per worker hovers in the 10 to 20 range.

It's almost like the queue size is too small, or worker servers can't pull tasks off of the queue fast enough to be fully utilized and as you add more workers they have a harder time pulling tasks off the queue quickly.

urls = ["https://...", ..., "https://..."]
tasks = []
num = 0
for url in urls:
    num = num + 1
    tasks.append(fetch_url.s(num, url))

job = group(tasks)
start = time.time()
res = job.apply_async()
res.join()
print time.time() - start

Update: I have attached a graph of the succeeded tasks vs. time when using 1 worker server, 2 worker servers, etc. up to 5 worker servers. As you can see, the rate of task completion doubles going from 1 worker server to 2 worker servers, but as I add on more servers, the rate of task completion begins to level off.enter image description here

1
How did you ensure remote server can sustain increasing load? - temoto
Are you referring to the servers I'm hitting with my GET requests? - monstermac77
The GET requests are actually hitting hundreds of different servers, each of which is definitely able to handle this load (they're designed to). I think there might be a bottleneck in adding tasks to the queue; essentially, I think adding more workers beyond 3 doesn't get a speed up because tasks are not added to the queue fast enough for all the workers to be fully utilized. Any ideas on how to speed up adding tasks, ideally with python 2.7 (maybe multithreading adding the tasks so I can just add more CPUs)? - monstermac77
First, try to replace http request with eventlet.sleep(0.2). Second, try to access target service via insecure http, a relevant bug was recently fixed in eventlet. Third, getting rid of rabbitmq, I hate to say it, is always a good idea, redis broker works better. And finally, I suggest getting rid of celery if you have to process each request individually. Otherwise, group requests and send to queue in small batches, this definitely will help against queue performance problem (if there is one). - temoto
@temoto, very good suggestions. 1. I've been looking at the task completion time in Flower and pretty much all of them come back in about 0.1 seconds, but that's a good idea for those who could be hitting their target servers too hard. 2. Unfortunately, all of these target servers redirect to https. 3. Since making the post I did switch to redis and you're absolutely right: it is faster. 4. Have looked into dropping down to kombu, but your suggestion to group requests was brilliant. It does seem like the bottleneck was adding tasks to the queue, because using chunking in Celery fixed this. - monstermac77

1 Answers

1
votes

For future readers. Actions that helped, most significant benefit first:

  • Group several small work units into one celery task
  • Switch Celery broker from RabbitMQ to Redis

More useful hints not mentioned in original comment discussion, therefore unknown benefit significance for this question.

  • Use httplib2 or urllib3 or better HTTP library. requests burns CPU for no good reason
  • Use HTTP connection pool. Check and make sure you reuse permanent connections to target servers.

Chunking explained.

Before chunking

urls = [...]

function task(url)
  response = http_fetch(url)
  return process(response.body)

celery.apply_async url1
celery.apply_async url2
...

So task queue contains N=len(urls) tasks, each task is to fetch single url, perform some calculations on response.

With chunking

function chunk(xs, n)
  loop:
  g, rest = xs[:n], xs[n:]
  yield g

chunks = [ [url1, url2, url3], [4, 5, 6], ... ]

function task(chunk)
  pool = eventlet.GreenPool()
  result = {
    response.url: process(response)
    for response in pool.imap(http_fetch, chunk)
  }
  return result

celery.apply_async chunk1
celery.apply_async chunk2
...

Now task queue contains M=len(urls)/chunksize tasks, each task is to fetch chunksize urls and process all responses. Now you have to multiplex concurrent url fetches inside single chunk. Here it's done with Eventlet GreenPool.

Note, because Python, it is likely beneficial to first perform all network IO then perform all CPU calculations on all responses in chunk, amortizing CPU load via multiple celery workers.

All code in this answer is showing general direction only. You must implement better version with less copying and allocations.