2
votes

I want to write Structure Streaming Data into Cassandra using PySpark Structured Streaming API.

My data flow is like below:

REST API -> Kafka -> Spark Structured Streaming (PySpark) -> Cassandra

Source and Version in below: Spark version: 2.4.3 DataStax DSE: 6.7.6-1

initialize spark:

spark = SparkSession.builder\
.master("local[*]")\
.appName("Analytics")\
.config("kafka.bootstrap.servers", "localhost:9092")\
.config("spark.cassandra.connection.host","localhost:9042")\
.getOrCreate()

subscribe topic from Kafka:

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "topic") \
    .load()

Write into Cassandra:

    w_df_3 = df...

    write_db = w_df_3.writeStream \
    .option("checkpointLocation", '/tmp/check_point/') \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "analytics") \
    .option("table", "table") \
    .outputMode(outputMode="update")\
    .start()

executed with the following command:

$spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,datastax:spark-cassandra-connector:2.4.0-s_2.11 Analytics.py localhost:9092 topic

I am facing below issue/exception while writestream into Cassandra:

py4j.protocol.Py4JJavaError: An error occurred while calling o81.start.
: java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:322)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Could anyone help me out on how to resolve and proceed further? Any help will be appreciated.

Thanks in advance.

2
The error indicates it expects df_3.write rather than df_3.writeStreamOneCricketeer
It's simple: Data source org.apache.spark.sql.cassandra does not support streamed writingpissall
Are you able to use DSE Analytics instead of OSS Spark? It supports Structured Streaming - you only need to use BYOS instead of OSS connectorAlex Ott

2 Answers

1
votes

As i mentioned in the comment, if you're using DSE, you can use OSS Apache Spark with so-called BYOS (bring your own spark) - special jar that contains the DataStax's version of Spark Cassandra Connector (SCC) that contains direct support for structured streaming.

Since SCC 2.5.0 support for structured streaming is also available in open source version, so you can simply use writeStream with format for Cassandra. 2.5.0 also contains a lot of good things previously not available in the open source, such as additional optimizations, etc. There is a blog post that describes them in great details.

0
votes

Thanks a lot for your response.

I have implemented using it with ForeachBatch Sink instead of a direct sink.

w_df_3.writeStream\
.trigger(processingTime='5 seconds')\
.outputMode('update')\
.foreachBatch(save_to_cassandra)\
.start()

It's working. Thank you all.