0
votes

I am able to read a stream from a Kafka topic and write the (transformed) data back to another Kafka topic in two different steps in PySpark. The code to do that is as follows:

# Define Stream:
df = spark \
     .readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "localhost:9092") \
     .option("subscribe", "instream") \
     .load()

# Transform
matchdata = df.select(from_json(F.col("value").cast("string"),schema).alias("value"))\
          .select(F.col('value').cast("string"))

# Stream the data, from a Kafka topic to a Spark in-memory table
query = matchdata \
       .writeStream \
       .format("memory") \
       .queryName("PositionTable") \
       .outputMode("append") \
       .start()

query.awaitTermination(5)

# Create a new dataframe after stream completes:
tmp_df=spark.sql("select * from PositionTable")

# Write data to a different Kafka topic
tmp_df \
     .write \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "localhost:9092") \
     .option("topic", "outstream") \
     .save()

The code above works as expected: the data in Kafka topic "instream" is read in PySpark, and then PySpark can write out data to Kafka topic "outstream".

However, I would like to read the stream in and write the transformed data back out immediately (the stream will be unbounded and we would like insights immediately as the data rolls in). Following the documentation, I replaced the query above with the following:

query = matchdata \
       .writeStream \
       .format("kafka") \
       .option("kafka.bootstrap.servers", "localhost:9092") \
       .option("topic", "outstream") \
       .option("checkpointLocation", "/path/to/HDFS/dir") \
       .start()

This does not appear to work. There is no error message, so I do not know what is wrong. I've also tried windowing and aggregating within windows, but that also does not work. Any advice will be appreciated!

1

1 Answers

0
votes

Ok, I found the problem. The main reason was that the subdirectory "path/to/HDFS/dir" has to exist. After creating that directory the code ran as expected. It would have been nice if an error message stated something along those lines.