1
votes

I am trying to send data from a daily batch to a Kafka topic using pyspark, but I currently receive the following error:

Traceback (most recent call last): File "", line 5, in File "/usr/local/rms/lib/hdp26_c5000/spark2/python/pyspark/sql/readwriter.py", line 548, in save self._jwrite.save() File "/usr/local/rms/lib/hdp26_c5000/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/local/rms/lib/hdp26_c5000/spark2/python/pyspark/sql/utils.py", line 71, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"Invalid call to toAttribute on unresolved object, tree: unresolvedalias('shop_id, None)"

The code I am using is as follows:

from pyspark.sql import SparkSession
from pyspark.sql import functions

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.debug.maxToStringFields", 100000) \
.getOrCreate()

df = spark.sql('''select distinct shop_id, item_id 
from sale.data
''')

df.selectExpr("shop_id", "item_id") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "myserver.local:443") \
.option("topic","test_topic_01") \
.save()

Currently used versions are:

-Spark 2.1.1.2.6.2.0-205

-Kafka Broker 0.11

1

1 Answers

1
votes

Kafka expects that a key and a value is written into its topic. Although the key is not mandatory. It does that by looking at the names of the dataframe columns which should be "key" and "value".

In your query, you are only selecting the column "shop_id", so no key or value column is existing. The error message: "unresolvedalias('shop_id, None)" tells you that the column "shop_id" is selected as the key (as it is the first column), but nothing is interpreted as the mandatory value.

You can solve your issue by renaming the column to "value", something like:

df = spark.sql('''select distinct shop_id, item_id 
from sale.data
''')

df.withColumn("value", col("shop_id").cast(StringType)) \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "myserver.local:443") \
.option("topic","test_topic_01") \
.save()