8
votes

I am trying to fetch meta data of around 10k+ torrents per day using python libtorrent.

This is the current flow of code

  1. Start libtorrent Session.
  2. Get total counts of torrents we need metadata for uploaded within last 1 day.
  3. get torrent hashes from DB in chunks
  4. create magnet link using those hashes and add those magnet URI's in the session by creating handle for each magnet URI.
  5. sleep for a second while Meta Data is fetched and keep checking whether meta data s found or not.
  6. If meta data is received add it in DB else check if we have been looking for meta data for around 10 minutes , if yes then remove the handle i.e. dont look for metadata no more for now.
  7. do above indefinitely. and save session state for future.

so far I have tried this.

#!/usr/bin/env python
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri

import libtorrent as lt # libtorrent library
import tempfile # for settings parameters while fetching metadata as temp dir
import sys #getting arguiments from shell or exit script
from time import sleep #sleep
import shutil # removing directory tree from temp directory 
import os.path # for getting pwd and other things
from pprint import pprint # for debugging, showing object data
import MySQLdb # DB connectivity 
import os
from datetime import date, timedelta

session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0)
session.listen_on(6881, 6891)
session.add_extension('ut_metadata')
session.add_extension('ut_pex')
session.add_extension('smart_ban')
session.add_extension('metadata_transfer')

session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state"

if(os.path.isfile(session_save_filename)):

    fileread = open(session_save_filename, 'rb')
    session.load_state(lt.bdecode(fileread.read()))
    fileread.close()
    print('session loaded from file')
else:
    print('new session started')

session.add_dht_router("router.utorrent.com", 6881)
session.add_dht_router("router.bittorrent.com", 6881)
session.add_dht_router("dht.transmissionbt.com", 6881)
session.add_dht_router("dht.aelitis.com", 6881)

session.start_dht()
session.start_lsd()
session.start_upnp()
session.start_natpmp()

alive = True
while alive:

    db_conn = MySQLdb.connect(  host = '',  user = '',  passwd = '',    db = '',    unix_socket='/mysql/mysql.sock') # Open database connection
    #print('reconnecting')
    #get all records where enabled = 0 and uploaded within yesterday 
    subset_count = 100 ;

    yesterday = date.today() - timedelta(1)
    yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')
    #print(yesterday)

    total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ")
    #print(total_count_query)
    try:
        total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
        total_count_cursor.execute(total_count_query) # Execute the SQL command
        total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists.
        total_count = total_count_results[0]
        print(total_count)
    except:
            print "Error: unable to select data"

    total_pages = total_count/subset_count
    #print(total_pages)

    current_page = 1
    while(current_page <= total_pages):
        from_count = (current_page * subset_count) - subset_count

        #print(current_page)
        #print(from_count)

        hashes = []

        get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ")
        #print(get_mysql_data_query)
        try:
            get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
            get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command
            get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists.
            for row in get_mysql_data_results:
                hashes.append(row[0].upper())
        except:
            print "Error: unable to select data"

        #print(hashes)

        handles = []

        for hash in hashes:
            tempdir = tempfile.mkdtemp()
            add_magnet_uri_params = {
                'save_path': tempdir,
                'duplicate_is_error': True,
                'storage_mode': lt.storage_mode_t(2),
                'paused': False,
                'auto_managed': True,
                'duplicate_is_error': True
            }
            magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80"
            #print(magnet_uri)
            handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params)
            handles.append(handle) #push handle in handles list

        #print("handles length is :")
        #print(len(handles))

        while(len(handles) != 0):
            for h in handles:
                #print("inside handles for each loop")
                if h.has_metadata():
                    torinfo = h.get_torrent_info()
                    final_info_hash = str(torinfo.info_hash())
                    final_info_hash = final_info_hash.upper()
                    torfile = lt.create_torrent(torinfo)
                    torcontent = lt.bencode(torfile.generate())
                    tfile_size = len(torcontent)
                    try:
                        insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method
                        insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""",  [final_info_hash , torcontent] )
                        db_conn.commit()
                        #print "data inserted in DB"
                    except MySQLdb.Error, e:
                        try:
                            print "MySQL Error [%d]: %s" % (e.args[0], e.args[1])
                        except IndexError:
                            print "MySQL Error: %s" % str(e)    


                    shutil.rmtree(h.save_path())    #   remove temp data directory
                    session.remove_torrent(h) # remove torrnt handle from session   
                    handles.remove(h) #remove handle from list

                else:
                    if(h.status().active_time > 600):   # check if handle is more than 10 minutes old i.e. 600 seconds
                        #print('remove_torrent')
                        shutil.rmtree(h.save_path())    #   remove temp data directory
                        session.remove_torrent(h) # remove torrnt handle from session   
                        handles.remove(h) #remove handle from list
                sleep(1)        
                #print('sleep1')

        #print('sleep10')
        #sleep(10)
        current_page = current_page + 1

        #save session state
        filewrite = open(session_save_filename, "wb")
        filewrite.write(lt.bencode(session.save_state()))
        filewrite.close()


    print('sleep60')
    sleep(60)

    #save session state
    filewrite = open(session_save_filename, "wb")
    filewrite.write(lt.bencode(session.save_state()))
    filewrite.close()

I tried kept above script running overnight and found only around 1200 torrent's meta data is found in the overnight session. so I am looking for improve the performance of the script.

I have even tried Decoding the save_state file and noticed there are 700+ DHT nodes I am connected to. so its not like DHT is not running,

What I am planning to do is, keep the handles active in session indefinitely while meta data is not fetched. and not going to remove the handles after 10 minutes if no meta data is fetched in 10 minutes, like I am currently doing it.

I have few questions regarding the lib-torrent python bindings.

  1. How many handles can I keep running ? is there any limit for running handles ?
  2. will running 10k+ or 100k handles slow down my system ? or eat up resources ? if yes then which resources ? I mean RAM , NETWORK ?
  3. I am behind firewall , can be a blocked incoming port causing the slow speed of metadata fetching ?
  4. can DHT server like router.bittorrent.com or any other BAN my ip address for sending too many requests ?
  5. Can other peers BAN my ip address if they find out I am making too many requests only fot fetching meta data ?
  6. can I run multiple instances of this script ? or may be multi-threading ? will it give better performance ?
  7. if using multiple instances of the same script, each script will get unique node-id depending on the ip and port I am using , is this viable solution ?

Is there any better approach ? for achieving what I am trying ?

1

1 Answers

3
votes

I can't answer questions specific to libtorrent's APIs, but some of your questions apply to bittorrent in general.

will running 10k+ or 100k handles slow down my system ? or eat up resources ? if yes then which resources ? i mean RAM , NETWORK ?

Metadata-downloads shouldn't use much resources since they are not full torrent-downloads yet, i.e. they can't allocate the actual files or anything like that. But they will need some ram/disk space for the metadata itself once they grab the first chunk of those.

I am behind firewall , can be a blocked incoming port causing the slow speed of metadata fetching ?

yes, by reducing the number of peers that can establish connections it becomes more difficult to fetch metadata (or establish any connection at all) on swarms with a low peer count.

NATs can cause the same issue.

can DHT server like router.bittorrent.com or any other BAN my ip address for sending too many requests ?

router.bittorrent.com is a bootstrap node, not a server per se. Lookups don't query a single node, they query many different (among millions). But yes, individual nodes can ban, or more likely rate-limit, you.

This can be mitigated by looking for randomly distributed IDs to spread the load across the DHT keyspace.

can i run multiple instances of this script ? or may be multi-threading ? will it give better performance ?

AIUI libtorrent is sufficiently non-blocking or multi-threaded that you can schedule many torrents at once.

I don't know if libtorrent has a rate-limit for outgoing DHT requests.

if using multiple instances of the same script, each script will get unique node-id depending on the ip and port i am using , is this viable solution ?

If you mean the DHT node ID, then they're derived from the IP (as per BEP 42), not the port. Although some random element is included, so a limited amount of IDs can be obtained per IP.

And some of this might also might be applicable for your scenario: http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/

And another option is my own DHT implementation which includes a CLI to bulk-fetch torrents.