I am trying to read json data from kafka and process it in Scala.I am new to flink and kafka streaming so please try to answer by giving the solution code.I want to be able to convert it to Map containing all the key,value pairs.
map1.get("FC196") should give me Dormant where map1 is the map containing the key value pairs
The challenge I'm facing is converting the DataStream[ObjectNode] which is the st variable in the code to a map of key value pairs. I am using JSonDeserializerSchema.If I use Simple String Schema i get DataStream[String]. I am open to alternative suggestions.
Input Format from kafka :
{"FC196":"Dormant","FC174":"A262210940","FC195":"","FC176":"40","FC198":"BANKING","FC175":"AHMED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053575","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"afs","FC188":"BR08","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}
Code :
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WordCount {
def main(args: Array[String]) {
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.*:9092")
properties.setProperty("zookeeper.connect", "***.**.*.*:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09("new", new JSONDeserializationSchema() , properties))
st.print()
env.execute()
}
}
My code after the changes :
import java.util.Properties
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try
object WordCount{
def main(args: Array[String]) {
case class CC(key:String)
implicit val formats = org.json4s.DefaultFormats
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
.map(_.extract[CC])
st.print()
env.execute()
}
}
And for some reason I cannot put a Try in the flatmap as u described
error:
INFO [main] (TypeExtractor.java:1804) - No fields detected for class org.json4s.JsonAST$JValue. Cannot be used as a PojoType. Will be handled as GenericType
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:994)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:519)
at org.apache.flink.quickstart.WordCount$.main(WordCount.scala:36)
at org.apache.flink.quickstart.WordCount.main(WordCount.scala)
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$$anon$4
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 6 more
Process finished with exit code 1