After all the suggestion & help by Matt I have following code which is running successfully:
#!usr/bin/python
# Using the "dbmodule" from the previous example, create a ConnectionPool
from twisted.internet import reactor
from twisted.enterprise import adbapi
from twisted.internet import reactor, defer,threads
from twisted.python.threadpool import ThreadPool
import itertools
from twisted.internet.threads import deferToThread
from twisted.internet import reactor, defer, task
from tools.printTime import *
from tools.getVersion import *
from sh_log import *
concurrent = 30
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
#Creating Global Instance variables
path="tools"
lo=Log()
class ToolsBuilder:
def build(self,txn,tool,asset_id):
if tool:
print "\n"
try:
sql="select tool_filename from tool_master where tool_id = %s" %(tool,)
sql_asset="select asset_url from asset_master where asset_id = %s" %(asset_id,)
txn.execute(sql_asset)
asset_url = txn.fetchall()
log_date=lo.log_date()
txn.execute(sql)
result = txn.fetchall()
log='\n'+log_date+"::"+str(result[0][0])+ " tool object is created......\n"
lo.wfile(log)
temp=(path +'/' + str(result[0][0]))
if result:
if temp:
f=open(temp).read()
obj_tool=compile(f, 'a_filename', 'exec')
return obj_tool
except:
lo.wfile("Error in creating executable tool object......")
tb=ToolsBuilder()
class ToolsVectorGenerator:
def generate(self,txn,res_set={}):
v1=[]
for asset_id in res_set.iterkeys():
try:
obj_tools=[]
if asset_id:
print "asset_id..............................",asset_id
log_date=lo.log_date()
log=log_date+"::"+" \nVector generation for the asset number...:"+str(asset_id)
lo.wfile(log)
vector=[]
tools_arr=[]
obj_tools=[]
for tool in res_set[asset_id]:
if tool:
print "tool..............",tool
temp_tool=tb.build(txn,tool,asset_id)
print "temp_tool..........",temp_tool
#fetch data of tool setting.....
sql_tool_setting="select * from tool_asset_settings where tool_id =%s" %(tool,)
txn.execute(sql_tool_setting)
result_tool_setting = txn.fetchall()
tool_id=result_tool_setting[0][1]
t_id=int(tool_id)
tool_id_arr=[]
tool_id_arr.append(t_id)
tool_id_arr.append(result_tool_setting)
tool_id_arr.append(temp_tool)
tools_arr.append(tool_id_arr)
#fetch data from asset master
sql_asset="select asset_name from asset_master where asset_id=%s" %(asset_id,)
txn.execute(sql_asset)
result_asset = txn.fetchall()
vector.append(result_asset)
vector.append(tools_arr)
except:
lo.wfile("\nError in getting asset,please check your database or network connection......")
tvm.executeVector(vector)
tvg=ToolsVectorGenerator()
class Tool:
def exectool(self,tool):
exec tool
return
def getResult(self,tool):
return deferToThread(self.exectool, tool)
to=Tool()
class StateMachine:
def setPriority(self,txn,tup):
temp=[]
arr=[]
for li in tup:
sql2="select tool_dependency from tool_asset_settings where tool_id =%s" %(li[1],)
txn.execute(sql2)
result12 = txn.fetchall()
arr=[]
if result12[0][0]!=None:
tup12=result12[0][0]
arr=(li[0],tup12)
# print "arr.........",arr
if arr in tup:
print "This element is already exist......."
else:
temp.append(arr)
temp.extend(tup)
return tuple(temp)
st=StateMachine()
class ToolsVectorExecutionManager(object):
def executeVector(self,toolsvector):
print "toolsvector================>",toolsvector
if toolsvector:
for tools in toolsvector[1]:
if tools[2] != None:
to.getResult(tools[2])
tvm=ToolsVectorExecutionManager()
class ToolsToExecuteAnalyzer:
def __init__(self,dbpool=None):
self.dbpool = dbpool
self.loopCall = task.LoopingCall(self.myQuery)
def start(self):
print "Started scanner"
self.loopCall.start(3)
def stop(self):
print "Stopping scanner"
self.loopCall.stop()
def myQuery(self):
def interact(txn):
sql="SELECT tool_asset_id,tool_execute_id FROM tool_to_execute where status='0'"
txn.execute(sql)
result=txn.fetchall()
if result:
tool_asset_id=tuple([int(e[0]) for e in result])
tool_execute_id=tuple([int(e[1]) for e in result])
if len(tool_asset_id)>1:
sql1="SELECT asset_id,tool_id FROM tool_in_assets WHERE tool_asset_id IN %s"%(tool_asset_id,)
else:
sql1="SELECT asset_id,tool_id FROM tool_in_assets WHERE tool_asset_id = (%s)"%(tool_asset_id)
txn.execute(sql1)
tup = txn.fetchall()
#dependency check for the selected tool
asset_tool=st.setPriority(txn,tup)
log_date=lo.log_date()
log=log_date+"::priority have been set for the tools......\n"
lo.wfile(log)
#creating group of asset with their tools
res={}
for element in asset_tool:
if element[0] in res:
res[element[0]].append(int(element[1]))
else:
res[int(element[0])] = [int(element[1])]
#Recored deletion from tool_to_execute table
if res!=None and res.keys()!=[]:
for asset_id in res.iterkeys():
if len(tool_execute_id)>1:
sql_del="delete from tool_to_execute where tool_execute_id in %s " %(tool_execute_id,)
else:
sql_del="delete from tool_to_execute where tool_execute_id = %s" %(tool_execute_id)
txn.execute(sql_del)
#New Addition of vector
tvg.generate(txn,res)
# return res
d = self.dbpool.runInteraction(interact)
d.addCallbacks(self.printResult,self.printError)
def printResult(self,res):
print "In printResult after generate...."
def printError(self,error):
print "Got Error: %r" % error
error.printTraceback()
ToolsToExecuteAnalyzer()
if __name__ == '__main__':
from twisted.internet import reactor
dbpool = adbapi.ConnectionPool("MySQLdb", 'localhost', 'test', 'test','test')
s = ToolsToExecuteAnalyzer(dbpool)
reactor.callWhenRunning(s.start)
reactor.addSystemEventTrigger('before','shutdown',s.stop)
reactor.run()
This is my whole code, I just wanted to know how many threads running, means for each tool new thread?
Anyway, thanks Matt for your help..:)