Can anyone help with the Java code to convert the following JSON to Spark Dataframe..
Note : It is not the File
logic : Listen to kafka topic T1 , read the each record in the RDD and apply additional logic convert the result data to Json Object and write it to another topic T2 in Kafka..
T2 Structure is below.
JSON:
[
{
"@tenant_id":"XYZ",
"alarmUpdateTime":1526342400000,
"alarm_id":"AB5C9123",
"alarm_updates":[
{
"alarmField":"Severity",
"new_value":"Minor",
"old_value":"Major"
},
{
"alarmField":"state",
"new_value":"UPDATE",
"old_value":"NEW"
}
],
"aucID":"5af83",
"inID":"INC15234567",
"index":"test",
"product":"test",
"source":"ABS",
"state":"NEW"
}
]
Classes created :
ClassAlarm{
String @tenant_id;
String alarm_id;
.
.
List <AlarmUpdate> update;
Get and Setter functions for all variables
}
AlarmUpdate{
String alarmField;
String oldVal;
String NewVal;
Get and Setter functions for all variables
}
AppClass{
void static main(){
Alarm alarmObj = new Alarm();
//set values for variables in alarmObj.
Dataset <Row> results = jobCtx.getSparkSession().createDataFrame(Arrays.asList(alarmObj), Alarm.class)
//At this point seeing following errors.
}
}
Error:
2018-05-15 13:40:48 ERROR JobScheduler - Error running job streaming job 1526406040000 ms.0 scala.MatchError: com.ca.alarmupdates.AlarmUpdate@48c8809b (of class com.ca.alarmupdates.AlarmUpdate) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:170) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1105) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1105) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1105) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1103) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.Iterator$class.toStream(Iterator.scala:1322) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.AbstractIterator.toStream(Iterator.scala:1336) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.AbstractIterator.toSeq(Iterator.scala:1336) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:406) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at com.ca.alarmupdates.AlarmUpdates.lambda$null$0(AlarmUpdates.java:85) ~[classes/:?] at java.util.Arrays$ArrayList.forEach(Arrays.java:3880) ~[?:1.8.0_161] at com.ca.alarmupdates.AlarmUpdates.lambda$main$f87f782d$1(AlarmUpdates.java:58) ~[classes/:?] at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at scala.util.Try$.apply(Try.scala:192) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_161] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_161]