
I'm trying to capture stream from multiple webcam using the rtsp link in flask app and want to show through the browser. To achieve this I created two individual producers, topic and two consumer. After starting the kafka server and running both consumer.py and producer.py, the both streaming is running only for two seconds.

import time
import sys
import cv2
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer2 = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'my-topic'
topic2 = 'my-topic2'

def emit_video():
    print('start emitting')
    camera = cv2.VideoCapture('rtsp://webcam1')
    camera2 = cv2.VideoCapture('rtsp://webcam2')

    while True:
        success, frame = camera.read()
        success2, frame2 = camera2.read()
        if not success2:
            print("camera issue")
        # png might be too large to emit
            data = cv2.imencode('.jpeg', frame)[1].tobytes()
            data2 = cv2.imencode('.jpeg', frame2)[1].tobytes()

            future = producer.send(topic, data)
            future2 = producer2.send(topic2, data2)
            except KafkaError as e:

            print('.', end='', flush=True)
            # to reduce CPU usage
            # time.sleep(0.2)
        # print()
    # video.release()



and this is my consumer.py

from flask import Flask, Response,render_template
from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic', bootstrap_servers='localhost:9092')
consumer2 = KafkaConsumer('my-topic2', bootstrap_servers='localhost:9092')

app = Flask(__name__)

def kafkastream():
    for message in consumer:
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + message.value + b'\r\n\r\n')

def kafkastream2():
    for message in consumer2:
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + message.value + b'\r\n\r\n')

def video_feed():
    return Response(kafkastream(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

def video_feed2():
    return Response(kafkastream2(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

def index():
    return render_template('index.html')

if __name__ == '__main__':
What happens when it stops working after two minutes? If it's timing out on the future.get() calls, I'd want to know why you're making that call in order to suggest a better alternative.matt2000
first camera streaming run only for two seconds and after that two seconds, second camera just start. After that, producer. py showing "File "src/producer. py", line 29, in emit_video data = cv2.imencode('.jpeg', frame)[1].tobytes() cv2.error: OpenCV(4.0.0) /io/opencv/modules/imgcodecs/src/grfmt_base.cpp:145: error: (-10:Unknown error code -10) Raw image encoder error: Empty JPEG image (DNL not supported) in function 'throwOnEror' "Arif Ibrahim
Can I use consumer and consumer2 using same port 9092?Arif Ibrahim
@ArifIbrahim 9092 is the port for the bootstrap server. It absolutely fine to have more than 1 consumer connecting to the same bootstrap server.Giorgos Myrianthous
@GiorgosMyrianthous Thank you very much. Now I confirmed that my code is fine. The issue was created for using RTSP link. Without rtsp the code runs fine.Arif Ibrahim

1 Answers

data = cv2.imencode('.jpeg', frame)[1].tobytes()

my sample code

ret, buffer = cv2.imencode('.jpg', frame)

producer.send(topic, buffer.tobytes())

delete '[1]'