1
votes

I have two Spark streams set up in a notebook to run in parallel like so.

  spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
  df1 = spark \
      .readStream.format("delta") \
      .table("test_db.table1") \
      .select('foo', 'bar')


  writer_df1 = df1.writeStream.option("checkpoint_location", checkpoint_location_1) \
      .foreachBatch(
      lambda batch_df, batch_epoch:
      process_batch(batch_df, batch_epoch)
  ) \
      .start()

  spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
  df2 = spark \
  .readStream.format("delta") \
  .table("test_db.table2") \
  .select('foo', 'bar')

  writer_df2 = merchant_df.writeStream.option("checkpoint_location", checkpoint_location_2) \
    .foreachBatch(
    lambda batch_df, batch_epoch:
    process_batch(batch_df, batch_epoch)
  ) \
    .start()

These dataframes then get processed row by row, with each row being sent to an API. If the API call reports an error, I then convert the row into JSON and append this row to a common failures table in databricks.

columns = ['table_name', 'record', 'time_of_failure', 'error_or_status_code']
vals = [(table_name, json.dumps(row.asDict()), datetime.now(), str(error_or_http_code))]
error_df = spark.createDataFrame(vals, columns)
error_df.select('table_name','record','time_of_failure', 'error_or_status_code').write.format('delta').mode('Append').saveAsTable("failures_db.failures_db)"

When attempting to add the row to this table, the saveAsTable() call here throws the following exception.

py4j.protocol.Py4JJavaError: An error occurred while calling o3578.saveAsTable. : java.lang.IllegalStateException: Cannot find the REPL id in Spark local properties. Spark-submit and R doesn't support transactional writes from different clusters. If you are using R, please switch to Scala or Python. If you are using spark-submit , please convert it to Databricks JAR job. Or you can disable multi-cluster writes by setting 'spark.databricks.delta.multiClusterWrites.enabled' to 'false'. If this is disabled, writes to a single table must originate from a single cluster. Please check https://docs.databricks.com/delta/delta-intro.html#frequently-asked-questions-faq for more details.

If I comment out one of the streams and re-run the notebook, any errors from the API calls get inserted into the table with no issues. I feel like there's some configuration I need to add but am not sure of where to go from here.

1

1 Answers

0
votes

Not sure if this is the best solution, but I believe the problem comes from each stream writing to the table at the same time. I split this table into separate tables for each stream and it worked after that.