I am working off Kafka 2.3.0 and Spark 2.3.4. I have already built a Kafka Connector which reads off a CSV file and posts a line from the CSV to the relevant Kafka topic. The line is like so: "201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect". The CSV contains 1000s of such lines. The Connector is able to successfully post them on the topic and I am also able to get the message in Spark. I am not sure how can I deserialize that message to my schema? Note that the messages are headerless so the key part in the kafka message is null. The value part includes the complete CSV string as above. My code is below.
I looked at this - How to deserialize records from Kafka using Structured Streaming in Java? but was unable to port it to my csv case. In addition I've tried other spark sql mechanisms to try and retrieve the individual row from the 'value' column but to no avail. If I do manage to get a compiling version (e.g. a map over the indivValues Dataset or dsRawData) I get errors similar to: "org.apache.spark.sql.AnalysisException: cannot resolve 'IC
' given input columns: [value];". If I understand correctly, it is because value is a comma separated string and spark isn't really going to magically map it for me without me doing 'something'.
//build the spark session
SparkSession sparkSession = SparkSession.builder()
.appName(seCfg.arg0AppName)
.config("spark.cassandra.connection.host",config.arg2CassandraIp)
.getOrCreate();
...
//my target schema is this:
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("timeOfOrigin", DataTypes.TimestampType, true),
DataTypes.createStructField("cName", DataTypes.StringType, true),
DataTypes.createStructField("cRole", DataTypes.StringType, true),
DataTypes.createStructField("bName", DataTypes.StringType, true),
DataTypes.createStructField("stage", DataTypes.StringType, true),
DataTypes.createStructField("intId", DataTypes.IntegerType, true),
DataTypes.createStructField("intName", DataTypes.StringType, true),
DataTypes.createStructField("intCatId", DataTypes.IntegerType, true),
DataTypes.createStructField("catName", DataTypes.StringType, true),
DataTypes.createStructField("are_vval", DataTypes.IntegerType, true),
DataTypes.createStructField("isee_vval", DataTypes.IntegerType, true),
DataTypes.createStructField("opCode", DataTypes.IntegerType, true),
DataTypes.createStructField("opType", DataTypes.StringType, true),
DataTypes.createStructField("opName", DataTypes.StringType, true)
});
...
Dataset<Row> dsRawData = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.arg3Kafkabootstrapurl)
.option("subscribe", config.arg1TopicName)
.option("failOnDataLoss", "false")
.load();
//getting individual terms like '201310', 'XYZ001'.. from "values"
Dataset<String> indivValues = dsRawData
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(), Encoders.STRING());
//indivValues when printed to console looks like below which confirms that //I receive the data correctly and completely
/*
When printed on console, looks like this:
+--------------------+
| value|
+--------------------+
| 201310|
| XYZ001|
| Sup|
| XYZ|
| A|
| 0|
| Presales|
| 6|
| Callout|
| 0|
| 0|
| 1|
| N|
| Prospect|
+--------------------+
*/
StreamingQuery sq = indivValues.writeStream()
.outputMode("append")
.format("console")
.start();
//await termination
sq.awaitTermination();
- I require the data to be typed as my custom schema shown above since I would be running mathematical calculations over it (for every new row combined with some older rows).
- Is it better to synthesize headers in the Kafka Connector source task before pushing them onto the topic? Will having headers make this issue resolution simpler?
Thanks!