I am running Python ib-api to receive the realtime market data from Interactive Brokers. It can provide the data I expected but it ends with "unhandled exception in EReader thread".
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`
`DEFAULT_PRICE_DATA_ID = 1001`
`FINISHED = object()
STARTED = object()
TIME_OUT = object()`
class finishableQueue(object):
def __init__(self, queue_to_finish):
self._queue = queue_to_finish
self.status = STARTED
def get(self, timeout):
contents_of_queue=[]
finished=False
while not finished:
try:
current_element = self._queue.get(timeout=timeout)
if current_element is FINISHED:
finished = True
self.status = FINISHED
else:
contents_of_queue.append(current_element)
except queue.Empty:
finished = True
self.status = TIME_OUT
return contents_of_queue
def timed_out(self):
return self.status is TIME_OUT
class TestWrapper(EWrapper):
def __init__(self):
self._my_price_data_dict = {}
def get_error(self, timeout=5):
if self.is_error():
try:
return self._my_errors.get(timeout=timeout)
except queue.Empty:
return None
return None
def is_error(self):
an_error_if=not self._my_errors.empty()
return an_error_if
def init_error(self):
error_queue=queue.Queue()
self._my_errors = error_queue
def error(self, id, errorCode, errorString):
## Overriden method
errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
self._my_errors.put(errormsg)
def init_ibprices(self, tickerid):
ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()
return ibprice_data_queue
def tickPrice(self, reqId, tickType, price, attrib):
tickdata = (TickTypeEnum.to_str(tickType), price)
price_data_dict = self._my_price_data_dict
if reqId not in price_data_dict.keys():
self.init_ibprices(reqId)
price_data_dict[reqId].put(tickdata)
class TestClient(EClient):
def __init__(self, wrapper):
EClient.__init__(self, wrapper)
def error(self, reqId, errorCode, errorString):
print("Error: ", reqId, " ", errorCode, " ", errorString)
def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
ib_data_queue = finishableQueue(self.init_ibprices(tickerid))
self.reqMktData(
tickerid,
ibcontract,
"",
False,
False,
[]
)
MAX_WAIT_SECONDS = 5
print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)
price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)
while self.wrapper.is_error():
print(self.get_error())
if ib_data_queue.timed_out():
print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")
self.cancelMktData(tickerid)
return price_data
class TestApp(TestWrapper, TestClient):
def __init__(self, ipaddress, portid, clientid):
TestWrapper.__init__(self)
TestClient.__init__(self, wrapper=self)
self.connect(ipaddress, portid, clientid)
thread = Thread(target = self.run)
thread.start()
setattr(self, "_thread", thread)
self.init_error()
def main(slist):
app = TestApp("127.0.0.1", 7497, 1)
for i in slist:
ibcontract = IBcontract()
ibcontract.secType = "STK"
ibcontract.symbol = i
ibcontract.exchange ="SEHK"
Lastprice = app.getIBrealtimedata(ibcontract)
df = pd.DataFrame(Lastprice)
print(ibcontract.symbol, df.head())
app.disconnect()
if __name__ == "__main__":
seclist = [700,2318,5,12]
main(seclist)
Here are the error messages:
unhandled exception in EReader thread Traceback (most recent call last): File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py", line 34, >in run data = self.conn.recvMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >99, in recvMsg buf = self._recvAllMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >119, in _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] An operation was attempted on something that is >not a socket