0
votes

I'm currently using Spark Structured Steaming to read json data out of a Kafka topic. The json is stored as a string in the topic. To accomplish this I supply a hardcoded JSON schema as a StructType. I'm searching for a good way to dynamically infer the schema of a topic during streaming.

This is my code: (It's Kotlin and not the usually used Scala)

spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "my_topic")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING)")
    .select(
        from_json(
            col("value"),
            JsonSchemaRegistry.mySchemaStructType)
        .`as`("data")
    )
    .select("data.*")
    .writeStream()
    .format("my_format")
    .option("checkpointLocation", "/path/to/checkpoint")
    .trigger(ProcessingTime("25 seconds"))
    .start("/my/path")
    .awaitTermination()

Is this possible in a clean way right now without infering it again for each DataFrame? I'm looking for some idiomatic way. If schema inference is not advisable in Structured Streaming I would continue to hardcode my schemas, but want to be sure. The option spark.sql.streaming.schemaInference is mentioned in the Spark docs but I can't see how to use it.

1

1 Answers

2
votes

For KAFKA NOT possible. Takes too much time. For file sources you can.

From the manual:

Schema inference and partition of streaming DataFrames/Datasets

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

But for file sources which is not KAFKA.