I am trying to build a server to sample a streaming price feed and update a postgres DB using SQLAlchemy. I am using threaded instances of a mapped class, which seems to work but is not stable.
There are no issues with 1 or 2 instances of the Stream class, but with say 10, the thread fails randomly and silently. Each time before it fails, SQLAlchemy gives an error message, so it seems this is what is killing the thread. There is nothing wrong with the stream, it is always stable.
Have I missed something with the SQLAlchemy setup? Is there a better way of feeding multiple realtime subscriptions into SQL?
The code:
import time
import json
from threading import Thread, Lock
import sqlalchemy as db
from sqlalchemy.orm import scoped_session, sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
# Setup SQLAlchemy
engine = db.create_engine('postgresql://localhost:5432/Project', echo=False)
metadata = db.MetaData(bind=engine)
Session = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()
Base.metadata.create_all(engine)
session = Session()
#DB classes
#static data table
class StockMaster(Base):
__tablename__ = 'stock_master'
id = db.Column(db.Integer, primary_key=True)
ticker = db.Column(db.String)
stock_name = db.Column(db.String)
@classmethod
def find_by_ticker(cls,ticker):
return session.query(StockMaster).filter(StockMaster.ticker==ticker).first()
#live data table
class StockLive(Base):
__tablename__ = 'stock_live'
id = db.Column(db.Integer, primary_key=True)
quote = db.Column(db.Numeric)
timestamp = db.Column(db.Numeric)
ticker_id = db.Column(db.Integer, db.ForeignKey('stock_master.id'))
ticker = relationship("StockMaster", foreign_keys=[ticker_id])
def __init__(self, quote, ticker_id, timestamp):
self.quote=quote
self.ticker_id=ticker_id
self.timestamp=timestamp
def save_to_db(self):
session.add(self)
session.commit()
@classmethod
def find_by_ticker_id(cls,ticker_id):
return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()
@classmethod
def find_by_ticker(cls,ticker):
ticker_id = StockMaster.find_by_ticker(ticker).id
return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()
class Stream(Thread):
def __init__(self,ticker):
Thread.__init__(self)
self.ticker=ticker
self.quote=1
self.data_set = StockLive.find_by_ticker(self.ticker)
self.count=0
def run(self):
con.subscribe(self.ticker)
current_mid=1
while True:
new_data = json.loads(con.get_price(self.ticker).to_json())
new_mid = new_data['Mid']
if new_mid == current_mid:
pass
else:
current_mid = new_mid
self.data_set.quote = current_mid
self.data_set.timestamp = time.time()
try:
self.data_set.save_to_db()
self.count+=1
except:
self.data_set = StockLive.find_by_ticker(self.ticker)
print('error saving to db for '+self.ticker)
time.sleep(.1)
if __name__ == '__main__':
threads={}
for ticker in tickerlist:
try:
threads[ticker]=Stream(ticker)
threads[ticker].setName('Thread ' + ticker)
threads[ticker].start()
except:
print('Error setting up '+ticker)
while True:
for ticker in tickerlist:
if threads[ticker].isAlive()==False:
threads[ticker]=Stream(ticker)
SQLAlchemy error message:
/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2323: SAWarning: Usage of the 'Session.add()' operation is not currently supported within the execution stage of the flush process. Results may not be consistent. Consider using alternative event listeners or connection-level operations instead. % method) /anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2425: SAWarning: Attribute history events accumulated on 1 previously clean instances within inner-flush event handlers have been reset, and will not result in database updates. Consider using set_committed_value() within inner-flush event handlers to avoid this warning. % len_) Exception in thread Thread MSFT: Traceback (most recent call last):
File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2436, in _flush transaction.commit() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 465, in commit self._assert_active(prepared_ok=True) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 285, in _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closedDuring handling of the above exception, another exception occurred:
Traceback (most recent call last): File "", line 48, in run self.data_set.save_to_db() File "", line 44, in save_to_db session.commit() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 954, in commit self.transaction.commit() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 467, in commit self._prepare_impl() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl self.session.flush() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2313, in flush self._flush(objects) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2440, in _flush transaction.rollback(_capture_exception=True) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 76, in exit compat.reraise(type_, value, traceback) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 249, in reraise raise value File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2440, in _flush transaction.rollback(_capture_exception=True) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 483, in rollback self._assert_active(prepared_ok=True, rollback_ok=True) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 285, in _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/anaconda3/lib/python3.7/threading.py", line 917, in _bootstrap_inner self.run() File "", line 53, in run self.data_set = StockLive.find_by_ticker(self.ccy) File "", line 52, in find_by_ticker ticker_id = StockMaster.find_by_ticker(ticker).id File "", line 23, in find_by_ticker return session.query(StockMaster).filter(StockMaster.ticker==ticker).first() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2895, in first ret = list(self[0:1]) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2687, in getitem return list(res) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2994, in iter self.session._autoflush() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1493, in _autoflush self.flush() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2313, in flush self._flush(objects) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2400, in _flush subtransactions=True) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 865, in begin nested=nested) File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 297, in _begin self._assert_active() File "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 264, in _assert_active "This session is in 'prepared' state; no further " sqlalchemy.exc.InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.
pool
setting in create engine. by default it is 5 ( ur code is creaking for more then 5 threaad) so change it to ur needs. – sahasrara62