0
votes

This is my terminal command to run strm.py file

$SPARK_HOME/bin/spark-submit --master local --driver-memory 4g --num-executors 2 --executor-memory 4g --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 org.apache.spark:spark-cassandra-connector_2.11:2.4.0 strm.py

Error:

Cannot load main class from JAR org.apache.spark:spark-cassandra-connector_2.11:2.4.0 with URI org.apache.spark. Please specify a class through --class. at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:657) atorg.apache.spark.deploy.SparkSubmitArguments.loadEnvironmentArguments(SparkSubmitArguments.scala:224) at org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:116) at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$1.(SparkSubmit.scala:907) at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:907) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:81) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

So can anyone help me out what is the issue with this why it can't loading.

1
This is the code i wrote for storing streaming data to Cassandra table . query1 = query.writeStream\ .option("checkpointLocation", '/tmp/check_point/')\ .format("org.apache.spark.sql.cassandra")\ .option("keyspace","test")\ .option("table", "my_tables")\ .start()\ .awaitTermination()Instinct

1 Answers

1
votes

You have 2 problems:

  • you're incorrectly submitting your application - you don't have a comma between org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 and org.apache.spark:spark-cassandra-connector_2.11:2.4.0, so spark-submit treats cassandra connector as a jar, instead of using your python file.

  • current version of Spark Cassandra Connector doesn't support direct write for Spark Structured Streaming data - this functionality is available only in DSE Analytics. But you can workaround this by using foreachBatch, something like this (not tested, the working Scala code is available here):

def foreach_batch_function(df, epoch_id):
    df.format("org.apache.spark.sql.cassandra").option("keyspace","test")\
       .option("table", "my_tables").mode('append').save()

query.writeStream.foreachBatch(foreach_batch_function).start()