I am going to leave this here for others who, like me, are searching to expand their Python understanding. It took me awhile to figure out how to approach this problem, but the solution was made clear after talking to a coworker who understood this kind of issue.
So in short, the answer that worked for me used Python 2.7.x's native Threading and Queue modules.
I have this my main program which manages the various threads and queue's I setup. The NetworkWorker class, which extends the threading module, upon init it also spins itself new Queue's for each instance. The Queue reference / handler is stored in a global list variable. I simply loop through the Queue list and send data to each threads Queue in my main thread (main.py). Then each thread gets it's data and does what it's supposed to. And data received back from each HTTP connection is loaded into another queue that is processed by a single command execution thread back in main.py.
The following code has been modified / extracted from it's original context. I have tested it and it works perfectly so long as in the you correctly configure the servers in self.conf DICT, located in main.py > my_service > init, and the server response with valid JSON. Honestly it could use some clean up. To ensure that the code remains public and accessible I added a Creative Commons License. Anyone who feels this code resembles their own code may contact me for proper attribution.
Except for main.py, the names of the other 2 files are important. shared_globals.py and workerThread.py filenames are case sensitive and must be in the same folder as main.py
Main executable: main.py
#!/usr/bin/python
# encoding=utf8
from time import sleep, time
import subprocess, sys, os # used to get IP, system calls, etc
import json
# For web support
import httplib
import urllib
import zlib
import base64
# wokerThread Dependancy
import shared_globals
from workerThread import NetworkWorker
import Queue
import threading
'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
class my_service:
# * * * *
def __init__(self):
# Manually list off the servers I want to talk to
self.conf = {}
self.conf['servers'] = {}
self.conf['servers']['ServerName1'] = {}
self.conf['servers']['ServerName1']['protocol'] = "http"
self.conf['servers']['ServerName1']['url'] = "server.com"
self.conf['servers']['ServerName1']['port'] = "80"
self.conf['servers']['ServerName1']['path'] = "/somefile.php"
self.conf['servers']['ServerName1']['timeout'] = "10" # Seconds. Make sure this is long enough for your largest OR mission critical HTTP/S transactions to finish + time it takes to wait for your data to come into your persistant HTTP/S thread. Data comes in every 2 seconds, so 5-10 seconds should be fine. Anything that takes too long will cause the queue to back up too much.
self.conf['servers']['ServerName2'] = {}
self.conf['servers']['ServerName2']['protocol'] = "http"
self.conf['servers']['ServerName2']['url'] = "otherserver.net"
self.conf['servers']['ServerName2']['port'] = "80"
self.conf['servers']['ServerName2']['path'] = "/dataio.php"
self.conf['servers']['ServerName2']['timeout'] = "5"
# Start the Threading Manager, which will manage the various threads and their components
# All cross thread communication needs to be managed with Queues
self.threadManager()
def threadManager(self):
# A place to reference all threads
self.threads = []
print "Loading Shared Globals"
# This is the 3rd file in this project. I would not need this if
# the NetworkWorker Thread was inside of this same file. But since it
# is in another file, we use this shared_globals file to make the Queue's
# list and other shared resources available between the main thread and the NetworkWorker Threads
shared_globals.init()
# Keep track of all the threads / classes we are initializing
self.workers = {} # Keep track of all the worker threads
print "Initalizing Network Worker Threads from Config"
# For each server we want to talk to, we start a worker thread
# Read servers from self.conf and init threads / workers
for t in self.conf['servers']: # Loop through servers in config
# T = server name
#print "T: ", self.conf['servers'][t]
self.workers[t] = NetworkWorker() # Save worker handlers to workers dict
# Set the server data for each NetworkWorker Thread
self.workers[t].set_server(self.conf['servers'][t]['url'], self.conf['servers'][t]['port'], self.conf['servers'][t]['timeout'], self.conf['servers'][t]['path'])
print "Initalizing Command Processing Queue"
cmd_q = Queue.Queue()
cmd_q.daemon = True
shared_globals.cmd_active_queue = cmd_q
print "Starting Command Processing thread"
# Start the data gathering thread
t_cmd = threading.Thread(target=self.command_que_thread_manager)
t_cmd.daemon = True
self.threads.append(t_cmd)
t_cmd.start()
print "Start Data Gathering thread"
# Start the data gathering thread
t = threading.Thread(target=self.data_collector_thread)
t.daemon = True
self.threads.append(t)
t.start()
print "Starting Worker threads"
for w in self.workers: # Loop through all worker handlers
self.workers[w].start() # Start the jobs
# We have our NetworkWorker Threads running, and they init their own queues which we
# send data to using the def below titled self.send_data_to_networkWorkers
print "Service Started\n\n\n"
# This keeps the main thread listening so you can perform actions like killing the application with CTRL+C
while threading.active_count() > 0:
try:
sleep(0.1)
except (KeyboardInterrupt, SystemExit): # Exits the main thread without complainnt!
print "\n"
os._exit(0)
os._exit(0)
def data_collector_thread(self):
'''
Gather all the data we want to send to each server
Send data to the queues for each NetworkWorker thread we init'd above
'''
# Loop indefinately
while True:
# Gather your data and load into data Dict
data = {"data":"values"}
print "\n\nData to be sent to all NetworkWorker threads: ", data, "\n\n"
# Prep the data for HTTP/S
# If you need to do something else with the data besides sending it to the threads, do it here
data = self.prep_data_for_HTTP(data) # Do any pre-HTTP/S processing here
self.send_data_to_networkWorkers(data) # Send the data out to all the Threads Queue's
sleep(2) # wait for a little bit and then iterate through the loop again. This is your main loop timer.
def prep_data_for_HTTP(self, data):
'''
I am converting my data from a python dict to a JSON Starting
I compress the JSON Starting
I load the compressed string into another dict, as the HTTP/S object (in the NetworkWorker thread) expects a DICT
URL encode the data for HTTP/S POST transit
Return the manipulated data object, now ready for HTTP/S
'''
data = json.dumps(data, encoding='utf8') # Now continue preparing for HTTP/S
data = zlib.compress(data, 8)
# In PHP, get the data from the $_POST['data'] key
data = {"data":data}
data = urllib.urlencode(data)
return data
# END DEF
def command_que_thread_manager(self):
'''
Run as a thread
Send data to this thread via it's queue, init'd above in thread Manager
Grabs data, and then does something to process it
'''
while True:
data = shared_globals.cmd_active_queue.get()
print "Processing Command: ", data
# END DEF
def send_data_to_networkWorkers(self,data):
'''
Send data to all the NetworkWorker threads
'''
for q in shared_globals.network_active_queues:
q.put(data)
def clean_exit(self):
'''
Run when exiting the program for a clean exit
I don't think I actually call this in my example,
but upon main thread exit it would be a good idea to do so
'''
for w in self.workers: # Loop through all worker handlers
self.workers[w].stop() # Stop the jobs
# END DEF
# END CLASS
if __name__ == "__main__":
my_service = my_service()
Shared Globals file: shared_globals.py
#!/usr/bin/python
# encoding=utf8
'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
def init():
global network_active_queues
global cmd_active_queues
global cmd_q
# Keep track of the data going to the Network Worker Threads
print "Initalizing Network Active Queues"
network_active_queues = []
# Keep track of the commands
print "Initalizing Command Active Queues"
cmd_active_queue = ""
# ?
#cmd_q = []
NetworkWorker Class: workerThread.py
#!/usr/bin/python
# encoding=utf8
'''
This work, Python NetworkWorker Queue / Threading, is licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Written by John Minton @ http://pythonjohn.com/
To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-sa/4.0/.
'''
import Queue
import threading
import httplib
import urllib
import json
# wokerThread Dependancy
# Add another queue list for HTTP/S Responses
import shared_globals
class NetworkWorker(threading.Thread):
def __init__(self):
'''
Extend the Threading module
Start a new Queue for this instance of this class
Run the thread as a daemon
shared_globals is an external file for my globals between main script and this class.
Append this Queue to the list of Queue's in shared_globals.network_active_queues
Loop through shared_globals.network_active_queues to send data to all Queues that were started with this class
'''
threading.Thread.__init__(self)
self.q = Queue.Queue()
self.q.daemon = True
shared_globals.network_active_queues.append(self.q)
# Init the queue for processing commands
def run(self):
'''
Establish a persistant HTTP Connection
Pull data from the Queue
When data comes in, send it to the server
I send the response from the HTTP server to another queue / thread
You can do what you want to do with responses from the HTTP Server
'''
# Set your headers
headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain", "Connection": "keep-alive"} # "Connection": "keep-alive" for persistance
# Init the presistant HTTP connection
http_request = httplib.HTTPConnection( self.url, int(self.port), timeout=int(self.timeout) )
# Init response_data
response_data = str()
# Start the loop
while True:
# The code waits here for the queue to have data. If no data, it just sleeps until you send it data via it's Queue.
data = self.q.get()
# .... When it gets data, we proceed with the data variable.
try:
http_request.request( "POST", self.path, data, headers )
response = http_request.getresponse()
response_data = response.read()
# This is the response from the HTTP/S Server
print "Response: ", response_data
except Exception, e:
# In the event something goes wrong, we can simply try to reestablish the HTTP
print e, "Re-establishing HTTP/S Connection"
http_request = httplib.HTTPConnection( self.url, int(self.port), timeout=int(self.timeout) )
# If the HTTP transaction was successful, we will have our HTTP response data in response_data variable
if response_data:
# Try Except will fail on bad JSON object
try:
# Validate JSON & Convert from JSON to native Python Dict
json_data = json.loads(response_data)
# Send response from server to the command thread manager
shared_globals.cmd_active_queue.put(json_data)
except ValueError, e:
print "Bad Server Response: Discarding Invalid JSON"
# Repackage the invalid JSON, or some identifier thereof, and send to command processing thread
# Load into THIS NetworkWorker's thread queue a new data object to tell the server that there was malformed JSON and to resend the data.
#http_request.request( "POST", self.path, data, headers )
#response = http_request.getresponse()
#response_data = response.read()
# Place this here for good measure, if we ever exit the while loop we will close the HTTP/S connection
http_request.close()
# END DEF
def set_server(self, url, port, timeout, path):
'''
Use this to set the server for this class / thread instance
Variables that are passed in are translated to class instance variables (self)
'''
self.url = url
self.port = port
self.timeout = timeout
self.path = path
# END DEF
def stop(self):
'''
Stop this queue
Stop this thread
Clean up anything else as needed - tell other threads / queues to shutdown
'''
shared_globals.network_active_queues.remove(self.q)
#self.q.put("shutdown") # Do we need to tell the threads to shutdown? Perhaps if reloading the config
self.join()
# END DEF
# END CLASS