0
votes

i use dataloader to inferface the data in kafka and it doesnt work

here is my code

class kfkdataset(Dataset):
def __init__(self,consumer,image_size):
    super(kfkdataset).__init__()
    self.image_size=image_size
    self.consumer = consumer
def __getitem__(self, index):
    info = json.loads(next(self.consumer).value)
    image_osspath = info['path']
    image = prep_image_batch(image_osspath,self.image_size)
    return image,image_osspath


def __len__(self):
    # You should change 0 to the total size of your dataset.
    return 9000000


consumer = KafkaConsumer('my-topic',bootstrap_servers=[])

prodataset = kfkdataset(consumer,image_size=608)#)
k = DataLoader(prodataset,
        batch_size=batch_size,
        num_workers=16)
for inputimage,osspath in k:

    inputimage = inputimage.to(device)
    detections,_ = model(inputimage)
detections = non_max_suppression(detections, 0.98, 0.4)

it works when num_workers is 1

when num_workers >1: errors came out

File "batch_upload.py", line 80, in for inputimage,osspath in k: File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py", line 801, in__next__ return self._process_data(data) File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/dataloader.py", line 846,in_process_data data.reraise() File "/usr/local/lib/python3.6/dist-packages/torch/_utils.py", line 369, in reraise raise self.exc_type(msg) FileExistsError: Caught FileExistsError in DataLoader worker process 1. Original Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/worker.py", line 178, in _worker_loop data = fetcher.fetch(index) File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py", line 44, in fetch data = [self.dataset[idx] for idx in possibly_batched_index] File "/usr/local/lib/python3.6/dist-packages/torch/utils/data/_utils/fetch.py", line 44, in data = [self.dataset[idx] for idx in possibly_batched_index] File "/appbatch/utils/utils.py", line 49, in getitem info = json.loads(next(self.consumer).value) File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1192, in next return self.next_v2() File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1200, in next_v2 return next(self._iterator) File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 1115, in _message_generator_v2 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 654, in poll records = self._poll_once(remaining, max_records, update_offsets=update_offsets) File "/usr/local/lib/python3.6/dist-packages/kafka/consumer/group.py", line 701, in _poll_once self._client.poll(timeout_ms=timeout_ms) File "/usr/local/lib/python3.6/dist-packages/kafka/client_async.py", line 600, in poll self._poll(timeout / 1000) File "/usr/local/lib/python3.6/dist-packages/kafka/client_async.py", line 629, in _poll self._register_send_sockets() File "/usr/local/lib/python3.6/dist-packages/kafka/client_async.py", line 619, in _register_send_sockets self._selector.modify(key.fileobj, events, key.data) File "/usr/lib/python3.6/selectors.py", line 261, in modify key = self.register(fileobj, events, data) File "/usr/lib/python3.6/selectors.py", line 412, in register self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] File exists

i want know how to make it works

1
How many partitions do you have in this kafka topic? - Dave Canton
it seems like have 12 partitions - yenan wu
I see.. I think it is a connection socket problem - Dave Canton

1 Answers

0
votes

Basically, setting num_workers > 1 in PyTorch's DataLoader is creating several worker processes which are in turn biding to the same socket port as there is only one consumer.

One approach to parallelize and improve importing data from Kafka is to create several consumers in the same consumer group for that topic.