0
votes

I have created and deployed a spark cluster which consist of 4 container running

  1. spark master

  2. spark-worker

  3. spark-submit

  4. data-mount-container : to access the script from the local directory

i added required dependency jar in all these container

And also deployed the kafka in the host machine where it produce streaming via producer.

i launched the kafka as per the exact step in the below document

https://kafka.apache.org/quickstart

i verified kafka producer and consumer to exchange the message on 9092 port, which is working fine

Below is the simple pyspark script which i want to process as structured streaming

from pyspark import SparkContext
from pyspark.sql import SparkSession

print("Kafka App launched")
spark = SparkSession.builder.master("spark://master:7077").appName("kafka_Structured").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hostmachine:9092").option("subscribe", "session-event").option("maxOffsetsPerTrigger", 10).load()

converted_string=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

print("Recieved Stream in String", converted_string)

and below is the spark-submit i used to execute the script

##container
# pyspark_vol - container for vol mounting
# spark/stru_kafka - container for spark-submit
# i linked the spark master and worker already using the container 'master'

##spark submit
docker run --add-host="localhost: myhost" --rm -it --link master:master --volumes-from pyspark_vol spark/stru_kafka spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 –jars /home/spark/spark-2.1.1-bin-hadoop2.6/jars/spark-sql-kafka-0-10_2.11-2.1.1.jar --master spark://master:7077  /data/spark_session_kafka.py localhost 9092 session-event

After i ran the script, the script is executing fine, but it not seems to be listening to the streaming as a batch from the kafka producer and stopping the execution.

i didn't observed any specific error, but not producing any out put from the script

I verified the connectivity in receiving data from the host inside the docker container using socket program, which is working fine.

i am not sure if i have missed any configuration ..

Expected:

The above application which is running on spark-cluster should print the streaming coming from kafka producer

Actual

  "id" : "f4e8829f-583e-4630-ac22-1d7da2eb80e7",
  "runId" : "4b93d523-7b7c-43ad-9ef6-272dd8a16e0a",
  "name" : null,
  "timestamp" : "2020-09-09T09:21:17.931Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1922,
    "getBatch" : 287,
    "getOffset" : 361,
    "queryPlanning" : 111,
    "triggerExecution" : 2766,
    "walCommit" : 65
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[session-event]]",
    "startOffset" : null,
    "endOffset" : {
      "session-event" : {
        "0" : 24
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a1b0b4b"
  }
}


2

2 Answers

0
votes

According to the Quick Example provided in the Spark documentation you need to start your query and wait for its termination.

In your case that means you need to replace

print("Recieved Stream in String", converted_string)

with

query = df.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
0
votes

The issue was with my pyspark_stream script where i missed to provide batch processing time and print statement to view the logs...

since its not a aggregated streaming, i had to use 'append' here



result =df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

print("Kafka Straming output is",result)
query = result.writeStream.outputMode("append").format("console").trigger(processingTime='30 seconds').start()