0
votes

I was able to read Cassandra tables. I created Cassandra table according to spark dataframe schema. But when I tried to write spark dataframe to Cassandra table. I got following error. Environment: pyspark 3.0.1 local shell, Cassandra 3.11.

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/python/pyspark/sql/readwriter.py", line 825, in save
    self._jwrite.save()
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 128, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o62.save.
: com.datastax.spark.connector.datasource.CassandraCatalogException: Attempting to write to C* Table but missing
primary key columns: [logicalref]
        at com.datastax.spark.connector.datasource.CassandraWriteBuilder.<init>(CassandraWriteBuilder.scala:44)
        at com.datastax.spark.connector.datasource.CassandraTable.newWriteBuilder(CassandraTable.scala:69)
        at org.apache.spark.sql.execution.datasources.v2.BatchWriteHelper.newWriteBuilder(WriteToDataSourceV2Exec.scala:346)
        at org.apache.spark.sql.execution.datasources.v2.BatchWriteHelper.newWriteBuilder$(WriteToDataSourceV2Exec.scala:341)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.newWriteBuilder(WriteToDataSourceV2Exec.scala:253)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:259)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:54)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:354)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
1

1 Answers

0
votes

First I read emty cassandra table. I got columns. I select these columns and assigned another dataframe like

df = spark.read.format("org.apache.spark.sql.cassandra")...   
df2 = df.select(*df.columns)

Then I was able to write

df2.write.format("org.apache.spark.sql.cassandra")....