7
votes

I am basically reading from a Kafka source, and dumping each message through to my foreach processor (Thanks Jacek's page for the simple example).

If this actually works, i shall actually perform some business logic in the process method here, however, this doesn't work. I believe that the println doesn't work since its running on executors and there is no way for getting those logs back to driver. However, this insert into a temp table should at least work and show me that the messages are actually consumed and processed through to the sink.

What am I missing here ?

Really looking for a second set of eyes to check my effort here:

 val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker")) 
      .option("subscribe", src_topic) 
      .load()

    val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

    val df = stream.selectExpr("cast (value as string) as json")

    val writer = new ForeachWriter[Row] {
      val scon = new SConConnection
      override def open(partitionId: Long, version: Long) = {
        true
      }
      override def process(value: Row) = {
        println("++++++++++++++++++++++++++++++++++++" + value.get(0))
        scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
      }
      override def close(errorOrNull: Throwable) = {
        scon.closeConnection
      }
    }


    val yy = df.writeStream
      .queryName("ForEachQuery")
      .foreach(writer)
      .outputMode("append")
      .start()

    yy.awaitTermination()
1
what is SConConnection? The writer is serialized and send to the executors, if your scon is not serializable you should put the construction in the open method. Can you try to run it with --master local[*] and check if you get your printlns there? - Harald Gliebe
Yeah @Harald, I did try that. I dont see any printlns or anything i put in the process call.. In comparison, i tweaked incoming message a bit and dumped resultant dataframe to console, that console sink worked just fine.. - Raghav
My bad, in local mode, the prinltln does show up on driver log. However, i am still waiting for a succesful attempt at getting any business logic methods called.. - Raghav
But if the println is called you can also place your business logic there. What is your problem exactly? - Harald Gliebe
I have realized that i cant invoke my own methods from process method. if i dump my inline in process method, its getting invoked. However, hbaseconnection not being serializable still is probelmatic. even when i am opening a connection outside process call, and closing in close call.. :( - Raghav

1 Answers

3
votes

Thanks for comments from Harald and others, I found out a couple of things, which led me to achieve normal processing behaviour -

  1. test code with local mode, yarn isnt the biggest help in debugging
  2. for some reason, the process method of foreach sink doesnt allow calling other methods. When i put my business logic directly in there, it works.

hope it helps others.