0
votes

Can anyone help with the Java code to convert the following JSON to Spark Dataframe..

Note : It is not the File

logic : Listen to kafka topic T1 , read the each record in the RDD and apply additional logic convert the result data to Json Object and write it to another topic T2 in Kafka..

T2 Structure is below.

JSON:

 [  
   {  
      "@tenant_id":"XYZ",
      "alarmUpdateTime":1526342400000,
      "alarm_id":"AB5C9123",
      "alarm_updates":[  
         {  
            "alarmField":"Severity",
            "new_value":"Minor",
            "old_value":"Major"
         },
         {  
            "alarmField":"state",
            "new_value":"UPDATE",
            "old_value":"NEW"
         }
      ],
      "aucID":"5af83",
      "inID":"INC15234567",
      "index":"test",
      "product":"test",
      "source":"ABS",
      "state":"NEW"
   }
]

Classes created :

    ClassAlarm{

        String @tenant_id;
        String alarm_id;
        .
        .
        List <AlarmUpdate> update;
        Get and Setter functions for all variables
    }

AlarmUpdate{

    String alarmField;
    String oldVal;
    String NewVal;

    Get and Setter functions for all variables
} 

AppClass{


     void static main(){
             Alarm alarmObj = new Alarm();
          //set values for variables in alarmObj.
           Dataset <Row> results = jobCtx.getSparkSession().createDataFrame(Arrays.asList(alarmObj), Alarm.class)

           //At this point seeing following errors.

      }

}

Error:

2018-05-15 13:40:48 ERROR JobScheduler - Error running job streaming job 1526406040000 ms.0 scala.MatchError: com.ca.alarmupdates.AlarmUpdate@48c8809b (of class com.ca.alarmupdates.AlarmUpdate) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:170) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379) ~[spark-catalyst_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1105) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1105) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1105) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1103) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.Iterator$class.toStream(Iterator.scala:1322) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.AbstractIterator.toStream(Iterator.scala:1336) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298) ~[jaf-sdk-2.4.0.jar:?] at scala.collection.AbstractIterator.toSeq(Iterator.scala:1336) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:406) ~[spark-sql_2.11-2.2.0.jar:2.2.0] at com.ca.alarmupdates.AlarmUpdates.lambda$null$0(AlarmUpdates.java:85) ~[classes/:?] at java.util.Arrays$ArrayList.forEach(Arrays.java:3880) ~[?:1.8.0_161] at com.ca.alarmupdates.AlarmUpdates.lambda$main$f87f782d$1(AlarmUpdates.java:58) ~[classes/:?] at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at scala.util.Try$.apply(Try.scala:192) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[jaf-sdk-2.4.0.jar:?] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) ~[spark-streaming_2.11-2.2.0.jar:2.2.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_161] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_161]

1
Please update your question so that all of the code is in code block formatting so it is easier to read. - Collin M. Barrett

1 Answers

1
votes

You can use wholeTextFiles to read the json file and get the json text and use it in json api of SparkSession as

import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

static SparkSession spark = SparkSession.builder().master("local").appName("simple").getOrCreate();
static JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

Dataset<Row> df = spark.read().json(sc.wholeTextFiles("path to json file").map(t -> t._2()));
df.show(false);

and you should get

+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|@tenant_id|alarmUpdateTime|alarm_id|alarm_updates                               |aucID|inID       |index|product|source|state|
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|XYZ       |1526342400000  |AB5C9123|[[Severity,Minor,Major], [state,UPDATE,NEW]]|5af83|INC15234567|test |test   |ABS   |NEW  |
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+

You can use the master and appName as you prefer

Updated

You have commented that

The way you do through file , can we do it with the object . I have to convert to Ingest the data to the other T2

For that lets say you have a record read from T1 topic as string object as

    String t1Record = "[\n" +
            "  {\n" +
            "    \"@tenant_id\":\"XYZ\",\n" +
            "    \"alarmUpdateTime\":1526342400000,\n" +
            "    \"alarm_id\":\"AB5C9123\",\n" +
            "    \"alarm_updates\":[\n" +
            "      {\n" +
            "        \"alarmField\":\"Severity\",\n" +
            "        \"new_value\":\"Minor\",\n" +
            "        \"old_value\":\"Major\"\n" +
            "      },\n" +
            "      {\n" +
            "        \"alarmField\":\"state\",\n" +
            "        \"new_value\":\"UPDATE\",\n" +
            "        \"old_value\":\"NEW\"\n" +
            "      }\n" +
            "    ],\n" +
            "    \"aucID\":\"5af83\",\n" +
            "    \"inID\":\"INC15234567\",\n" +
            "    \"index\":\"test\",\n" +
            "    \"product\":\"test\",\n" +
            "    \"source\":\"ABS\",\n" +
            "    \"state\":\"NEW\"\n" +
            "  }\n" +
            "]";

and you convert it into RDD as

    JavaRDD<String> t1RecordRDD = sc.parallelize(Arrays.asList(t1Record));

Then you can apply the json api to convert into dataframe as

    Dataset<Row> df = spark.read().json(t1RecordRDD);

which should give you the same result as above