I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.
17 Answers
Twistedless solution:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
This one is slighty faster than the twisted solution and uses less CPU.
Things have changed quite a bit since 2010 when this was posted and I haven't tried all the other answers but I have tried a few, and I found this to work the best for me using python3.6.
I was able to fetch about ~150 unique domains per second running on AWS.
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
A solution using tornado asynchronous networking library
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
This code is using non-blocking network I/O and doesn't have any restriction. It can scale to tens of thousands of open connections. It will run in a single thread but will be a way faster then any threading solution. Checkout non-blocking I/O
I know this is an old question, but in Python 3.7 you can do this using asyncio
and aiohttp
.
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
You can read more about it and see an example here.
Use grequests , it's a combination of requests + Gevent module .
GRequests allows you to use Requests with Gevent to make asyncronous HTTP Requests easily.
Usage is simple:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
Create a set of unsent Requests:
>>> rs = (grequests.get(u) for u in urls)
Send them all at the same time:
>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
A good approach to solving this problem is to first write the code required to get one result, then incorporate threading code to parallelize the application.
In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).
You can follow this design pattern to resolve the above issue:
- Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished.
- Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a
list
ordict
in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.
I would suggest you use the threading module. You can use it to launch and track running threads. Python's threading support is bare, but the description of your problem suggests that it is completely sufficient for your needs.
Finally, if you'd like to see a pretty straightforward application of a parallel network application written in Python, check out ssh.py. It's a small library which uses Python threading to parallelize many SSH connections. The design is close enough to your requirements that you may find it to be a good resource.
If you're looking to get the best performance possible, you might want to consider using Asynchronous I/O rather than threads. The overhead associated with thousands of OS threads is non-trivial and the context switching within the Python interpreter adds even more on top of it. Threading will certainly get the job done but I suspect that an asynchronous route will provide better overall performance.
Specifically, I'd suggest the async web client in the Twisted library (http://www.twistedmatrix.com). It has an admittedly steep learning curve but it quite easy to use once you get a good handle on Twisted's style of asynchronous programming.
A HowTo on Twisted's asynchronous web client API is available at:
http://twistedmatrix.com/documents/current/web/howto/client.html
A solution:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
Testtime:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Using a thread pool is a good option, and will make this fairly easy. Unfortunately, python doesn't have a standard library that makes thread pools ultra easy. But here is a decent library that should get you started: http://www.chrisarndt.de/projects/threadpool/
Code example from their site:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
Hope this helps.
This twisted async web client goes pretty fast.
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
Create epoll
object,
open many client TCP sockets,
adjust their send buffers to be a bit more than request header,
send a request header — it should be immediate, just placing into a buffer,
register socket in epoll
object,
do .poll
on epoll
obect,
read first 3 bytes from each socket from .poll
,
write them to sys.stdout
followed by \n
(don't flush),
close the client socket.
Limit number of sockets opened simultaneously — handle errors when sockets are created. Create a new socket only if another is closed.
Adjust OS limits.
Try forking into a few (not many) processes: this may help to use CPU a bit more effectively.
[Tool]
Apache Bench is all you need. - A command line computer program (CLI) for measuring the performance of HTTP web servers
A nice blog post for you: https://www.petefreitag.com/item/689.cfm (from Pete Freitag)
For your case, threading will probably do the trick as you'll probably be spending most time waiting for a response. There are helpful modules like Queue in the standard library that might help.
I did a similar thing with parallel downloading of files before and it was good enough for me, but it wasn't on the scale you are talking about.
If your task was more CPU-bound, you might want to look at the multiprocessing module, which will allow you to utilize more CPUs/cores/threads (more processes that won't block each other since the locking is per process)
Consider using Windmill , although Windmill probably cant do that many threads.
You could do it with a hand rolled Python script on 5 machines, each one connecting outbound using ports 40000-60000, opening 100,000 port connections.
Also, it might help to do a sample test with a nicely threaded QA app such as OpenSTA in order to get an idea of how much each server can handle.
Also, try looking into just using simple Perl with the LWP::ConnCache class. You'll probably get more performance (more connections) that way.
I found that using the tornado
package to be the fastest and simplest way to achieve this:
from tornado import ioloop, httpclient, gen
def main(urls):
"""
Asynchronously download the HTML contents of a list of URLs.
:param urls: A list of URLs to download.
:return: List of response objects, one for each URL.
"""
@gen.coroutine
def fetch_and_handle():
httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
http_client = httpclient.AsyncHTTPClient()
waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
for url in urls])
results = []
# Wait for the jobs to complete
while not waiter.done():
try:
response = yield waiter.next()
except httpclient.HTTPError as e:
print(f'Non-200 HTTP response returned: {e}')
continue
except Exception as e:
print(f'An unexpected error occurred querying: {e}')
continue
else:
print(f'URL \'{response.request.url}\' has status code <{response.code}>')
results.append(response)
return results
loop = ioloop.IOLoop.current()
web_pages = loop.run_sync(fetch_and_handle)
return web_pages
my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])
The easiest way would be to use Python's built-in threading library. They're not "real" / kernel threads They have issues (like serialization), but are good enough. You'd want a queue & thread pool. One option is here, but it's trivial to write your own. You can't parallelize all 100,000 calls, but you can fire off 100 (or so) of them at the same time.
requests.get
andrequests.head
(i.e. a page request vs a head request) to return different status codes, so this is not the best advice – AlexG