1
votes

I am working with spark 2.4.0 and python 3.6. I am developing a python program with pyspark structured streaming actions. The program runs two readstream reading from two sockets, and after made a union of these two streaming dataframe. I tried spark 2.4.0 and 2.4.3 but nothing changed. Then I perform a unique writestream in order to write just one output streaming dataframe. THAT WORKS WELL. However, since I need to write also a non streaming dataset for all the micro-batches, I coded a foreachBatch call inside the writestream. THAT DOESN'T WORK.

I put spark.scheduler.mode=FAIR in spark.defaults.conf. I am running through spark-submit, but even though I tried with python3 directly, it doesn't work at all. It looks like as it didn't execute the splitStream function referred in the foreachBatch. I tried adding some print in the splitStream function, without any effects.

I made many attempting, but nothing changed, I submitted via spark-submit and by python. I am working on a spark standalone cluster.

inDF_1 = spark \
    .readStream \
    .format('socket') \
    .option('host', host_1) \
    .option('port', port_1) \
    .option("maxFilesPerTrigger", 1) \
    .load()

inDF_2 = spark \
    .readStream \
    .format('socket') \
    .option('host', host_2) \
    .option('port', port_2) \
    .option("maxFilesPerTrigger", 1) \
    .load() \
    .coalesce(1)

inDF = inDF_1.union(inDF_2)

#--------------------------------------------------#
#  write streaming raw dataser R-01 plateMeasures  #
#--------------------------------------------------#

def splitStream(df, epoch_id):
    df \
        .write \
        .format('text') \
        .outputMode('append') \
        .start(path = outDir0)

    listDF = df.collect()
    print(listDF)
    pass

stageDir = dLocation.getLocationDir('R-00')
outDir0 = dLocation.getLocationDir(outList[0])
chkDir = dLocation.getLocationDir('CK-00')
query0 = programName + '_q0'
q0 = inDF_1 \
        .writeStream \
        .foreachBatch(splitStream) \
        .format('text') \
        .outputMode('append') \
        .queryName(query0) \
        .start(path = stageDir
                    , checkpointLocation = chkDir)

I am using foreachBatch because I need to write several sinks for each input microbatch. Thanks a lot to everyone could try to help me.

1

1 Answers

1
votes

I have tried this in my local machine and works for Spark > 2.4.

df.writeStream
  .foreachBatch((microBatchDF, microBatchId) => {     
    microBatchDF
      .withColumnRenamed("value", "body")
      .write
      .format("console")
      .option("checkpointLocation","checkPoint")
      .save()
  })
  .start()
  .awaitTermination()