As simple as it gets, is it possible to Stream DStream to a Kafka topic?
I have Spark streaming job which does all the data processing, now I want to push the data to a Kafka topic. Is it possible to do so in pyspark?
better convert to json before writing to kafka otherwise specify key and value columns that are being written to kafka.
query = jdf.selectExpr("to_json(struct(*)) AS value")\
.writeStream\
.format("kafka")\
.option("zookeeper.connect", "localhost:2181")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("topic", "test-spark")\
.option("checkpointLocation", "/root/")\
.outputMode("append")\
.start()
If your message in AVRO format , we can serazlie messages and write in kafka directly .
from pyspark import SparkConf, SparkContext
from kafka import KafkaProducer
from kafka.errors import KafkaError
from pyspark.sql import SQLContext, SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
import avro.schema
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
import pandas as pd
ssc = StreamingContext(sc, 2)
ssc = StreamingContext(sc, 2)
topic = "test"
brokers = "localhost:9092"
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
kvs.foreachRDD(handler)
def handler(message):
records = message.collect()
for record in records:
<Data processing whatever you want and creating the var_val_value,var_val_key pair >
var_kafka_parms_tgt = {'bootstrap.servers': var_bootstrap_servr,'schema.registry.url': var_schema_url}
avroProducer = AvroProducer(var_kafka_parms_tgt,default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic=var_topic_tgt_name, value=var_val_value, key=var_val_key)
avroProducer.flush()