0
votes
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparkSqlMysql.class);

private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
        .getOrCreate();

public static void main(String[] args) {
    // JDBC connection properties


    // Load MySQL query result as Dataset

    Dataset<Row> df = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "SqlMessages").load();

I want to do something where I can read data from my spark SQL from my kafka topic but not able to do so .

Can someone guide who I can convert my data from kafka Topic to spark SQL ?

Something where I can do this

 Dataset<Row> schoolData = sparkSession.sql("select * from Schools");
1

1 Answers

0
votes

Was doing something similar today. Consumed entire topic from beginning, converted to DataFrame and Saved as Parquet table. You can adapt my code from Scala, idea should be clear.

val topic = "topic_bla_bla"
val brokers = "some_kafka_broker:9092"
val kafkaDF = spark.read.format("kafka").option("kafkaConsumer.pollTimeoutMs", "20000").option("startingOffsets", "earliest").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).load()
val jsonDF = kafkaDF.selectExpr("CAST(value AS STRING)")
val finalDF = spark.read.option("mode", "PERMISSIVE").json(jsonDF.as[String])
finalDF.registerTempTable("wow_table")
//OR
finalDF.write.format("parquet").saveAsTable("default.wow_table")
spark.sql("select * from wow_table")