I tried to consume from Kafka using Spark, more specifically PySpark and Structured Streaming.
import os
import time
import time
from ast import literal_eval
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
spark = SparkSession \
.builder \
.appName("Structured Streaming") \
.getOrCreate()
requests = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2:9092") \
.option("subscribe", "ssp.requests") \
.option("startingOffsets", "earliest") \
.load()
requests.printSchema()
# root |-- key: binary (nullable = true) |-- value: binary (nullable =
# true) |-- topic: string (nullable = true) |-- partition: integer
# (nullable = true) |-- offset: long (nullable = true) |-- timestamp:
# timestamp (nullable = true) |-- timestampType: integer (nullable =
# true)
When I ran the next lines of code
rawQuery = requests \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.trigger(processingTime="5 seconds") \
.format("parquet") \
.option("checkpointLocation", "/home/user/folder/applicationHistory") \
.option("path", "/home/user/folder") \
.start()
rawQuery.awaitTermination()
Py4JJavaError Traceback (most recent call last) /opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:
/opt/conda/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else:
Py4JJavaError: An error occurred while calling o70.awaitTermination. : org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. === Streaming Query === Identifier: [id = c2b48840-5ba4-416e-a192-dcae94007856, runId = 4afcca20-00cd-4187-a70b-1b742f1f5c0d] Current Committed Offsets: {} Current Available Offsets: {KafkaSource[Subscribe[ssp.requests]]:
I can't understand the reason of this error
Py4JJavaError: An error occurred while calling o70.awaitTermination