I am trying to send data from a daily batch to a Kafka topic using pyspark, but I currently receive the following error:
Traceback (most recent call last): File "", line 5, in File "/usr/local/rms/lib/hdp26_c5000/spark2/python/pyspark/sql/readwriter.py", line 548, in save self._jwrite.save() File "/usr/local/rms/lib/hdp26_c5000/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/local/rms/lib/hdp26_c5000/spark2/python/pyspark/sql/utils.py", line 71, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"Invalid call to toAttribute on unresolved object, tree: unresolvedalias('shop_id, None)"
The code I am using is as follows:
from pyspark.sql import SparkSession
from pyspark.sql import functions
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.debug.maxToStringFields", 100000) \
.getOrCreate()
df = spark.sql('''select distinct shop_id, item_id
from sale.data
''')
df.selectExpr("shop_id", "item_id") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "myserver.local:443") \
.option("topic","test_topic_01") \
.save()
Currently used versions are:
-Spark 2.1.1.2.6.2.0-205
-Kafka Broker 0.11