0
votes

I have created an event hub in azure and published some messages on the same using a python script. I'm able to fetch the messages from event hub using another python script but I'm unable to stream the messages using Pyspark. Below is the Pyspark code that I'm using to stream messages:

connectionString = <MyConnectionString>

ehConf = {
  'eventhubs.connectionString' : connectionString
}

ehConf['eventhubs.consumerGroup'] = "$default"

df = spark.readStream.format("eventhubs").options(**ehConf).load()

df.writeStream.format("parquet").outputMode("append").option("path", "azure_streaming_test").option("checkpointLocation", "azure_streaming_checkpoint").start()
query.awaitTermination()

By running the above code in Pyspark shell, I'm getting below error:

java.lang.IncompatibleClassChangeError: Method 'com.microsoft.azure.eventhubs.EventHubClient com.microsoft.azure.eventhubs.EventHubClient.createSync(java.lang.String, java.util.concurrent.ScheduledExecutorService)' must be InterfaceMethodref constant

Attached is the screenshot of the error message.

What needs to be corrected? Thanks in advance! Error Message

1

1 Answers

0
votes

I did tried to repro on the issue and the below peice of code just worked fine . Can you tell me as which version of Pyspark are you using ? Did you found a solution to the issue ?

from pyspark.sql.functions import *

from pyspark.sql.types import *

connectionString = "XX"


ehConf = {

  'eventhubs.connectionString' : connectionString

}

ehConf['eventhubs.consumerGroup'] = "$Default"

df = spark.readStream.format("eventhubs").options(**ehConf).load()

Schema = StructType([StructField("cardNumber", StringType(), True),

                      StructField("transactionId", StringType(), True),

                      StructField("transactionTime", StringType(), True)                   



                    ])

rawData = df. \

  selectExpr("cast(Body as string) as json"). \

  select(from_json("json", Schema).alias("data")). \

  select("data.*")

parsedData=rawData.select('transactionId','cardNumber','transactionTime')  

display(parsedData)



df.writeStream.format("parquet").outputMode("append").option("path", "/azure_streaming_test").option("checkpointLocation", "/azure_streaming_checkpoint").start()