I setting up a Websocket that receives market data from 33 pairs, process the data and insert it into a local mysql database.
what I've tried so far :
- Setting up the websocket works fine, then process the data on each new message function and insert it directly into the database --> problem was that with 33 pairs the websocket was stacking up the buffer with market data, and after a few minutes I would get a delay in the database of at least 10 seconds
- Then I tried processing the data through a thread : the on_message function would execute a thread that is simply putting the market data into an array, like below
datas=[]
def add_queue(symbol,t,a,b,r_n):
global datas
datas.append([symbol,t,a,b,r_n])
if json_msg['ev']=="C":
symbol=json_msg['p'].replace("/","-")
round_number=pairs_dict_new[symbol]
t = Thread(target=add_queue, args=(symbol,json_msg['t'],json_msg['a'],json_msg['b'],round_number,))
t.start()
and then another function, with a loop thread would pick it up to insert it into the database
def add_db():
global datas
try:
# db = mysql.connector.connect(
# host="104.168.157.164",
# user="bvnwurux_noe_dev",
# password="Tickprofile333",
# database="bvnwurux_tick_values"
# )
while True:
for x in datas:
database.add_db(x[0],x[1],x[2],x[3],x[4])
if x in datas:
datas.remove(x)
except KeyboardInterrupt:
print("program ending..")
t2 = Thread(target=add_db)
t2.start()
still giving a delay, and the threaded process wasn't actually using a lot of CPU but more of RAM and it just was even worse.
- instead of using a websocket with a thread, I tried simple webrequests to the API call, so with 1 thread per symbol, it would loop through a webrequest and in everythread send it to the database. my issues here were that mysql connections don't like threads (sometimes they would make a request with the same connection at the same time and crash) or it would still be delayed by the time to process the code, even without buffer. the code was taking too long to process the answered request that it couldnt keep it under 10s of delay.
Here is a little example of the basic code I used to get the data.
pairs={'AUDCAD':5,'AUDCHF':5,'AUDJPY':3,'AUDNZD':5,'AUDSGD':2,'AUDUSD':5,'CADCHF':5,'CADJPY':3,'CHFJPY':3,'EURAUD':5,'EURCAD':5,'EURCHF':5,'EURGBP':5,'EURJPY':3,'EURNZD':5,'EURSGD':5,'EURUSD':5,'GBPAUD':5,'GBPCAD':5,'GBPCHF':5,'GBPJPY':3,'GBPNZD':5,'GBPSGD':5,'GBPUSD':5,'NZDCAD':5,'NZDCHF':5,'NZDJPY':3,'NZDUSD':5,'USDCAD':5,'USDCHF':5,'USDJPY':3,'USDSGD':5,'SGDJPY':3}
def on_open(ws):
print("Opened connection")
ws.send('{"action":"auth","params":"<API KEY>"}') #connecting with secret api key
def on_message(ws, message):
print("msg",message)
json_msg = json.loads(message)[0]
if json_msg['status'] == "auth_success": # successfully authenticated
r = ws.send('{"action":"subscribe","params":"C.*"}') # subscribing to currencies
print("should subscribe to " + pairs)
#once the websocket is connected to all the pairs, process the data
--> process json_msg
if __name__ == "__main__":
# websocket.enableTrace(True) # just to show all the requests made (debug mode)
ws = websocket.WebSocketApp("wss://socket.polygon.io/forex",
on_open=on_open,
on_message=on_message)
ws.run_forever(dispatcher=rel) # Set dispatcher to automatic reconnection
rel.signal(2, rel.abort) # Keyboard Interrupt
rel.dispatch()
- method I tried multiprocess, but this was on the other crashing my server because it would use 100% CPU, and then the requests made on the apache server would not reach or take a long time loading. Its really a balance problem
I'm using an ubuntu server with 32CPUS, based in london and the API polygon is based in NYC. I also tried with 4 CPUS in seattle to NYC, but still no luck.
Even with 4 pairs and 32CPUS , it would eventually reach 10s delay. I think this is more of a code structure problem.