I have a pipeline in a place where data is being sent from Flink
to Kafka
topic in a JSON
format. I was also able to get it from the Kafka topic and was able to get the JSON attributes as well. Now, like scala reflect
classes where I can also compare the data type at runtime, I was trying to do the same thing in Fink using TypeInformation
where I can set some predefined format and whatever data is being read from topic should go under this Validation
and should be passed or failed accordingly.
I have a data like below:.{"policyName":"String", "premium":2400, "eventTime":"2021-12-22 00:00:00" }
For my problem, I came across a couple of examples in Flink's book
where it is mentioned how to create a TypeInformation variable
but there was nothing mentioned on how to use it so I tried my way:
val objectMapper = new ObjectMapper()
val tupleType: TypeInformation[(String, String, String)] =
Types.TUPLE[(String, Int, String)]
println(tupleType.getTypeClass)
src.map(v => v)
.map { x =>
val policyName: String = objectMapper.readTree(x).get("policyName").toString()
val premium: Int = objectMapper.readTree(x).get("premium").toString().toInt
val eventTime: String = objectMapper.readTree(x).get("eventTime").toString()
if ((policyName, premium, eventTime)== tupleType.getTypeClass) {
println("Good Record: " + (policyName, premium, eventTime))
}
else {
println("Bad Record: " + (id, category, eventTime))
}
}
Now if I pass the input as below to the flink kafka producer:
{"policyName":"whatever you feel like","premium":"4000","eventTime":"2021-12-20 00:00:00"}
It should give me the expected output as a "Bad record" and the tuple
since the datatype of premium is String and not Long/Int.
If a pass the input as below:
{"policyName":"whatever you feel like","premium":4000,"eventTime":"2021-12-20 00:00:00"}
It should give me the output as "Good Record" and the tuple
But according to my code, it is always giving me the else part.
If I create a datastream
variable and store the results of the above map
and then compare like below then it gives me the correct result:
if (tupleType == datas.getType()) { //where 'datas' is a datastream
print("Good Records")
} else {
println("Bad Records")
}
But I want to send the good/bad
records to a different stream
or maybe can directly be inserted in the Cassandra
table. So, that is why I am using loops for identifying the records one by one. Is my way correct? What would be the best practice considering what I am trying to achieve?
Based on Dominik's inputs, I tried creating my ow CustomDeserializer
class:
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import java.nio.charset.StandardCharsets
class sample extends DeserializationSchema[String] {
override def deserialize(message: Array[Byte]): Tuple3[Int, String, String] = {
val data = new String(message,
StandardCharsets.UTF_8)
val objectMapper = new ObjectMapper()
val id: Int = objectMapper.readTree(data).get("id").toString().toInt
val category: String = objectMapper.readTree(data).get("Category").toString()
val eventTime: String = objectMapper.readTree(data).get("eventTime").toString()
return (id, category, eventTime)
}
override def isEndOfStream(t: String): Boolean = ???
override def getProducedType: TypeInformation[String] = return TypeInformation.of(classOf[String])
}
I wanna try to implement something like below:
src.map(v => v)
.map { x =>
if (new sample().deserialize(x)==true) {
println("Good Record: " + (id, category, eventTime))
}
else {
println("Bad Record: " + (id, category, eventTime))
}
}
But the input is in Array[Bytes]
form. So how can I implement it? Where exactly I am going wrong? What needs to be modified? This is my first ever attempt in Flink Scala custom classes.
Inputs Passed: Inputs
Premium
then consider it as good record) ,then send it to some table. – user17775951JsonNode
andObjectMapper
for getting the attributes values. So, you mean while fetching these attributes, I should perform a check there itself for data types ? I am new to Flink so wanted to do it the Flink way that's why I thought ofTypeInformation
– user17775951