
I'm trying to writing code of a Producer and Consumer using Kafka and Spark Streaming and Python; the scenario is the following: there is a producer of randomic messages concerned to odometry in Json format that publishes messages every 3 seconds on a topic using threading:

from kafka import KafkaProducer
from kafka.errors import KafkaError import threading
from random import randint import random
import json
import math

def sendMessage():

    #the function is called every 3 seconds, then a message is sent every 3 seconds
    threading.Timer(3.0, sendMessage).start()

    #connection with message broker
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))    

    #the id is initially fixed to 1, but there could be more robots
    robotId = 1
    #generation of random int
    deltaSpace = randint(1, 9) #.encode()
    thetaTwist = random.uniform(0, math.pi*2) #.encode()

    future = producer.send('odometry', key=b'message', value={'robotId': robotId, 'deltaSpace': deltaSpace, 'thetaTwist': thetaTwist}).add_callback(on_send_success).add_errback(on_send_error)

    # Block for 'synchronous' sends
        record_metadata = future.get(timeout=10)
    except KafkaError:
    # Decide what to do if produce request failed...


def on_send_success(record_metadata):
    print ("topic name: " + record_metadata.topic)
    print ("number of partitions: " + str(record_metadata.partition))
    print ("offset: " + str(record_metadata.offset))

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception


Then there is a Consumer that consumes messages every 3 seconds on the same topic and process them with Spark Streaming; here is the code:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json 

# Create a local StreamingContext with two working thread and batch interval of 3 second
sc = SparkContext("local[2]", "OdometryConsumer")
ssc = StreamingContext(sc, 3)

kafkaStream = KafkaUtils.createDirectStream(ssc, ['odometry'], {'metadata.broker.list': 'localhost:9092'})

parsed = kafkaStream.map(lambda v: json.loads(v))

def f(x): 

fore = parsed.foreachRDD(f) 

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

To run the application, I start zookeeper server on port 2181

sudo /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties

then I start the server/broker of Kafka on port 9092

sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

and then I start the Producer and the Consumer

python3 Producer.py

./spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/erca/Scrivania/proveTesi/SparkConsumer.py

The application runs without errors, but I'm not sure that the messages are really consumed; what can I do to verify that? Thank everyone who helps me!

In your function f, write x.take(10) instead.mayank agrawal
To clarify above comment, printing an RDD is not calling any action upon it, so Spark doesn't do anything. Also, a plain Kafka producer/consumer would be easier to testOneCricketeer

1 Answers


Use parsed.pprint() before the ssc.start() it will print the record on the console