1
votes

Elaborated: looking to implement scala code using structured spark streaming, DataFrame to read a JSON event from Kafka, and use spark-sql to manipulate data/columns and write it to hive?

Using scala 2.11/spark 2.2

I understand creating the connection is straight forward:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

How do I handle the JSON event? Assuming all event have identical schema, do I have to provide the schema, and if so how is it done, also, if there's a way to infer the schema how is that done?

If I understand correctly I then create a tempView, how do I run sql-like queries on that view?

Edit required by your system: The auto system flagged this post as duplicate, it isn't. In the linked question the OP asked to fix a problem with his existing code, and the one (valid) answer addressed issues with deserialization of JSON. My question/s are different as stated above. If my question wasn't clear please ask specifically and I will try to clarify further. Thank you.

1
Thanks for adding this, however I looked at that post, and it doesn't answer my questions.DigitalFailure

1 Answers

0
votes

Assuming all event have identical schema, do I have to provide the schema, and if so how is it done, also, if there's a way to infer the schema how is that done?

Better side, if you know schema, provide a schema. You can create a schema as follows:

val schema = new StructType().add( "Id",IntegerType).add( "name",StringType)

Then read the data from Kafka and deserialize it as follows:

val data_df = df.select(from_json(col("value"), schema).as("data")).select("data.*")

You can create a temporary view out of data_df.

data_df.createOrReplaceTempView("data_df")

Now you can query the view using spark sql

spark.sql("select * from data_df").show()