2
votes

I'm trying to write a dataframe to kafka in JSON format and add a key to the data frame in Scala, i'm currently working with this sample from the kafka-spark:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .write
       .format("kafka")
       .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
       .option("topic", "topic1")
       .save()

Is there a to_json method that can be used (instead of the json(path) option which I believe writes out to a file in JSON format) and is there a key option that can be used to replace the null value with an actual key.

3

3 Answers

2
votes

This is a minimal example in scala. Let's say you have a dataframe df with columns x and y. Here's a minimal example:

val dataDS = (
    df
    .select(
        $"x".cast(StringType),
        $"y".cast(StringType)
    )
    .toJSON
    .withColumn("key", lit("keyname"))
)

(
    dataDS   
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "servername:port")
    .option("topic", "topicname")
    .save()
)

Remember you need the spark-sql-kafka library: e.g. for spark-shell is loaded with

spark-shell --packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"
0
votes

You can make use of the to_json SQL function to convert your columns into a JSON.

See Scala code below which is also making use of the Spark SQL built-in function struct in Spark version 2.4.5. Just make sure that your are naming your columns as key and value or applying corresponding aliases in your selectExpr.

import org.apache.spark.sql.functions.{col, struct, to_json}
import org.apache.spark.sql.SparkSession

object Main extends App {

  val spark = SparkSession.builder()
    .appName("myAppName")
    .master("local[*]")
    .getOrCreate()

  // create DataFrame
  import spark.implicits._
  val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name")
  df.show(false)

  // convert columns into json string
  val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value")
  df2.show(false)

  // +-----+------------------------+
  // |key  |value                   |
  // +-----+------------------------+
  // |Alice|{"age":3,"name":"Alice"}|
  // |Bob  |{"age":5,"name":"Bob"}  |
  // +-----+------------------------+

  // write to Kafka with jsonString as value
  df2.selectExpr("key", "value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "test-topic")
    .save()

}

This will return the following data into your Kafka topic:

kafka-console-consumer --bootstrap-server localhost:9092 --property print.key=true --property print.value=true --topic test-topic
Alice   {"age":3,"name":"Alice"}
Bob     {"age":5,"name":"Bob"}
-1
votes

You can use toJSON() method on dataframe to convert your record to json message.

df = spark.createDataFrame([('user_first_name','user_last_nmae',100)], ['first_name','last_name','ID'])

import json
from datetime import datetime
from pyspark.sql.functions import lit
json_df = json.loads(df.withColumn('date_as_key', lit(datetime.now().date())).toJSON().first())
print json_df

{u'date_as_key': u'2019-07-29', u'first_name': u'user_first_name', u'last_name': u'user_last_nmae', u'ID': 100}

Your code may be like this

from pyspark.sql.functions import lit
df.withColumn('key', lit(datetime.now())).toJSON()
       .write
       .format("kafka")
       .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
       .option("topic", "topic1")
       .save()

Scala:

import org.apache.spark.sql.Column;
someDF.withColumn("key",lit("name")).show() // replace "name" with your variable
 someDF.withColumn("key",lit("name")).toJSON.first() // toJSON is available as variable on dataframe in Scala

 someDF.withColumn("key",lit("name")).toJSON.first()
res5: String = {"number":8,"word":"bat","key":"name"}