1
votes

This is kind of a complicated question, so I'll do my best to be plain in my explanation and not give too many unnecessary details.

I developed a python script for work last year. It grabs basic system data and sends it to an HTTP/S server, which can send commands back if a users so chooses. It's kind of been a big experiment for the last year, seeing what works and what doesn't. Testing different needs within the company, etc. But now I have a pretty solid understanding of what we need. So I am starting my journey on version 2.

The aim of this new version is to maintain functionality while reducing system/CPU load and bandwidth. After developing this Python script out, the rest of the work will be done on the HTTP/S server. My question is specifically about the Client side, the Python script. I am using Python 2.7.x, most often on Debian based systems.

The v1 script grabs system data, reads a config file that contains servers to send the data to, uses threads to send to each server. (Still in those threads) each server can return 1 or more commands, which are then also processed through their own threads. The script is run once a minute via crontab. You can have 5 or more servers send 10 commands each and the script still executes everything smoothly, effectively and without taking a long time to finish commands issued by the servers.

In the v2 script, I am seeking to make the following required changes:

  • Will be run as a system service. So instead of the code being run by cron every minute, the script will loop every few seconds.

  • The loop needs to gather data once each time through the loop, then send it to each web server (As defined in the configuration file)

  • I want persistent HTTP/S connections for performance and bandwidth optimization.

  • I don't want to gather data each time through the loop for each HTTP/S server. I want to only gather data once per iteration through the main loop which drives the service, and then send that data to the threads that are managing the established HTTP/S persistent connections.

Here in lies my problem. How do I get persistent connections inside in their respective threads AND get data to those threads while only collecting the data once?

From does httplib reuse TCP connections? I see that persistent connections can be done in such a manner (Thank you Corey Goldberg):

con = httplib.HTTPConnection("myweb.com")
while True:
    con.request("GET", "/x.css", headers={"Connection":" keep-alive"})
    result = con.getresponse()
    result.read()
    print result.reason, result.getheaders()

Data gathering needs to happen inside this loop. But I need this to happen in multiple threads talking to various servers at the same time, and don't want to waste the resources to go and fetch the data more than once. I just don't see how it is possible, given my relatively limited knowledge of Python.

Basically as I see it right now, there needs to be a loop that drives the HTTP/S inside of their threads. Then I need some kind of loop to gather my data and prepare it to go to the HTTP/S connections. But how do I get the first loops inside of the second loops in such a way? It's like I need the HTTP/S persistent connection loop inside the data gathering loop, but I also need the data gathering loop inside the HTTP/S loop.

I would like to explore any pure 2.7.x pythonic ways this might be accomplished. Depending on outside utilities may be problematic for various reasons. This script, when finished, will be deployed to 150+ linux systems and the less that can go wrong, the better.

Thank you for your help and consideration!

1

1 Answers

1
votes

I am going to leave this here for others who, like me, are searching to expand their Python understanding. It took me awhile to figure out how to approach this problem, but the solution was made clear after talking to a coworker who understood this kind of issue.

So in short, the answer that worked for me used Python 2.7.x's native Threading and Queue modules.

I have this my main program which manages the various threads and queue's I setup. The NetworkWorker class, which extends the threading module, upon init it also spins itself new Queue's for each instance. The Queue reference / handler is stored in a global list variable. I simply loop through the Queue list and send data to each threads Queue in my main thread (main.py). Then each thread gets it's data and does what it's supposed to. And data received back from each HTTP connection is loaded into another queue that is processed by a single command execution thread back in main.py.

The following code has been modified / extracted from it's original context. I have tested it and it works perfectly so long as in the you correctly configure the servers in self.conf DICT, located in main.py > my_service > init, and the server response with valid JSON. Honestly it could use some clean up. To ensure that the code remains public and accessible I added a Creative Commons License. Anyone who feels this code resembles their own code may contact me for proper attribution.

Except for main.py, the names of the other 2 files are important. shared_globals.py and workerThread.py filenames are case sensitive and must be in the same folder as main.py

Main executable: main.py

#!/usr/bin/python
# encoding=utf8

from time import sleep, time
import subprocess, sys, os # used to get IP, system calls, etc
import json

# For web support
import httplib
import urllib
import zlib
import base64

# wokerThread Dependancy
import shared_globals
from workerThread import NetworkWorker

import Queue
import threading

'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''

class my_service:

    # * * * * 
    def __init__(self):

        # Manually list off the servers I want to talk to

        self.conf = {}
        self.conf['servers'] = {}

        self.conf['servers']['ServerName1'] = {}
        self.conf['servers']['ServerName1']['protocol'] = "http"
        self.conf['servers']['ServerName1']['url'] = "server.com"
        self.conf['servers']['ServerName1']['port'] = "80"
        self.conf['servers']['ServerName1']['path'] = "/somefile.php"
        self.conf['servers']['ServerName1']['timeout'] = "10" # Seconds. Make sure this is long enough for your largest OR mission critical HTTP/S transactions to finish + time it takes to wait for your data to come into your persistant HTTP/S thread. Data comes in every 2 seconds, so 5-10 seconds should be fine. Anything that takes too long will cause the queue to back up too much.

        self.conf['servers']['ServerName2'] = {}
        self.conf['servers']['ServerName2']['protocol'] = "http"
        self.conf['servers']['ServerName2']['url'] = "otherserver.net"
        self.conf['servers']['ServerName2']['port'] = "80"
        self.conf['servers']['ServerName2']['path'] = "/dataio.php"
        self.conf['servers']['ServerName2']['timeout'] = "5"

        # Start the Threading Manager, which will manage the various threads and their components
        # All cross thread communication needs to be managed with Queues
        self.threadManager()


    def threadManager(self):

        # A place to reference all threads
        self.threads = []

        print "Loading Shared Globals"
        # This is the 3rd file in this project. I would not need this if
        # the NetworkWorker Thread was inside of this same file. But since it
        # is in another file, we use this shared_globals file to make the Queue's 
        # list and other shared resources available between the main thread and the NetworkWorker Threads
        shared_globals.init()

        # Keep track of all the threads / classes we are initializing
        self.workers = {} # Keep track of all the worker threads

        print "Initalizing Network Worker Threads from Config"
        # For each server we want to talk to, we start a worker thread
        # Read servers from self.conf and init threads / workers
        for t in self.conf['servers']: # Loop through servers in config
            # T = server name
            #print "T: ", self.conf['servers'][t]
            self.workers[t] = NetworkWorker()      # Save worker handlers to workers dict

            # Set the server data for each NetworkWorker Thread
            self.workers[t].set_server(self.conf['servers'][t]['url'], self.conf['servers'][t]['port'], self.conf['servers'][t]['timeout'], self.conf['servers'][t]['path'])

        print "Initalizing Command Processing Queue"
        cmd_q = Queue.Queue()
        cmd_q.daemon = True
        shared_globals.cmd_active_queue = cmd_q

        print "Starting Command Processing thread"
        # Start the data gathering thread
        t_cmd = threading.Thread(target=self.command_que_thread_manager)
        t_cmd.daemon = True
        self.threads.append(t_cmd)
        t_cmd.start()

        print "Start Data Gathering thread"
        # Start the data gathering thread
        t = threading.Thread(target=self.data_collector_thread)
        t.daemon = True
        self.threads.append(t)
        t.start()

        print "Starting Worker threads"
        for w in self.workers:      # Loop through all worker handlers
            self.workers[w].start() # Start the jobs

        # We have our NetworkWorker Threads running, and they init their own queues which we 
        # send data to using the def below titled self.send_data_to_networkWorkers

        print "Service Started\n\n\n"

        # This keeps the main thread listening so you can perform actions like killing the application with CTRL+C
        while threading.active_count() > 0:
            try:
                sleep(0.1)
            except (KeyboardInterrupt, SystemExit): # Exits the main thread without complainnt!
                print "\n"
                os._exit(0)
        os._exit(0)

    def data_collector_thread(self):
        '''
        Gather all the data we want to send to each server
        Send data to the queues for each NetworkWorker thread we init'd above
        '''
        # Loop indefinately
        while True:

            # Gather your data and load into data Dict
            data = {"data":"values"}
            print "\n\nData to be sent to all NetworkWorker threads: ", data, "\n\n"

            # Prep the data for HTTP/S
            # If you need to do something else with the data besides sending it to the threads, do it here
            data = self.prep_data_for_HTTP(data) # Do any pre-HTTP/S processing here
            self.send_data_to_networkWorkers(data) # Send the data out to all the Threads Queue's
            sleep(2) # wait for a little bit and then iterate through the loop again. This is your main loop timer.

    def prep_data_for_HTTP(self, data):
        '''
        I am converting my data from a python dict to a JSON Starting
        I compress the JSON Starting
        I load the compressed string into another dict, as the HTTP/S object (in the NetworkWorker thread) expects a DICT
        URL encode the data for HTTP/S POST transit
        Return the manipulated data object, now ready for HTTP/S
        '''
        data = json.dumps(data, encoding='utf8') # Now continue preparing for HTTP/S
        data = zlib.compress(data, 8)
        # In PHP, get the data from the $_POST['data'] key
        data = {"data":data}
        data = urllib.urlencode(data)
        return data
    # END DEF

    def command_que_thread_manager(self):
        '''
        Run as a thread
        Send data to this thread via it's queue, init'd above in thread Manager
        Grabs data, and then does something to process it
        '''
        while True:
            data = shared_globals.cmd_active_queue.get()
            print "Processing Command: ", data
    # END DEF

    def send_data_to_networkWorkers(self,data):
        '''
        Send data to all the NetworkWorker threads
        '''
        for q in shared_globals.network_active_queues:
            q.put(data)

    def clean_exit(self):
        '''
        Run when exiting the program for a clean exit
        I don't think I actually call this in my example, 
        but upon main thread exit it would be a good idea to do so
        '''
        for w in self.workers:      # Loop through all worker handlers
            self.workers[w].stop()  # Stop the jobs

    # END DEF   

# END CLASS

if __name__ == "__main__":
    my_service = my_service()

Shared Globals file: shared_globals.py

#!/usr/bin/python
# encoding=utf8

'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''

def init():

    global network_active_queues
    global cmd_active_queues
    global cmd_q

    # Keep track of the data going to the Network Worker Threads
    print "Initalizing Network Active Queues"
    network_active_queues = []

    # Keep track of the commands 
    print "Initalizing Command Active Queues"
    cmd_active_queue = ""

    # ?
    #cmd_q = []

NetworkWorker Class: workerThread.py

#!/usr/bin/python
# encoding=utf8
'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
import Queue
import threading

import httplib
import urllib
import json 

# wokerThread Dependancy
# Add another queue list for HTTP/S Responses
import shared_globals

class NetworkWorker(threading.Thread):

    def __init__(self):
        '''
        Extend the Threading module
        Start a new Queue for this instance of this class
        Run the thread as a daemon
        shared_globals is an external file for my globals between main script and this class.
        Append this Queue to the list of Queue's in shared_globals.network_active_queues
        Loop through shared_globals.network_active_queues to send data to all Queues that were started with this class
        '''
        threading.Thread.__init__(self)
        self.q = Queue.Queue()
        self.q.daemon = True
        shared_globals.network_active_queues.append(self.q)
        # Init the queue for processing commands

    def run(self):
        '''
        Establish a persistant HTTP Connection
        Pull data from the Queue
        When data comes in, send it to the server
        I send the response from the HTTP server to another queue / thread
        You can do what you want to do with responses from the HTTP Server
        '''
        # Set your headers
        headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain", "Connection": "keep-alive"} # "Connection": "keep-alive" for persistance
        # Init the presistant HTTP connection
        http_request = httplib.HTTPConnection( self.url, int(self.port), timeout=int(self.timeout) )
        # Init response_data
        response_data = str()
        # Start the loop
        while True:
            # The code waits here for the queue to have data. If no data, it just sleeps until you send it data via it's Queue.
            data = self.q.get()
            # .... When it gets data, we proceed with the data variable.
            try:
                http_request.request( "POST", self.path, data, headers )
                response = http_request.getresponse()
                response_data = response.read()
                # This is the response from the HTTP/S Server
                print "Response: ", response_data
            except Exception, e:
                # In the event something goes wrong, we can simply try to reestablish the HTTP
                print e, "Re-establishing HTTP/S Connection"
                http_request = httplib.HTTPConnection( self.url, int(self.port), timeout=int(self.timeout) )

            # If the HTTP transaction was successful, we will have our HTTP response data in response_data variable
            if response_data:
                # Try Except will fail on bad JSON object           
                try:
                    # Validate JSON & Convert from JSON to native Python Dict   
                    json_data = json.loads(response_data)

                    # Send response from server to the command thread manager
                    shared_globals.cmd_active_queue.put(json_data)

                except ValueError, e:
                    print "Bad Server Response: Discarding Invalid JSON"
                    # Repackage the invalid JSON, or some identifier thereof, and send to command processing thread
                    # Load into THIS NetworkWorker's thread queue a new data object to tell the server that there was malformed JSON and to resend the data.
                    #http_request.request( "POST", self.path, data, headers )
                    #response = http_request.getresponse()
                    #response_data = response.read()


        # Place this here for good measure, if we ever exit the while loop we will close the HTTP/S connection
        http_request.close()

    # END DEF


    def set_server(self, url, port, timeout, path):
        '''
        Use this to set the server for this class / thread instance
        Variables that are passed in are translated to class instance variables (self)
        '''
        self.url = url
        self.port = port
        self.timeout = timeout
        self.path = path
    # END DEF


    def stop(self):
        '''
        Stop this queue
        Stop this thread
        Clean up anything else as needed - tell other threads / queues to shutdown
        '''
        shared_globals.network_active_queues.remove(self.q)
        #self.q.put("shutdown") # Do we need to tell the threads to shutdown? Perhaps if reloading the config
        self.join()

    # END DEF

# END CLASS