0
votes

I have a pipeline in a place where data is being sent from Flink to Kafka topic in a JSON format. I was also able to get it from the Kafka topic and was able to get the JSON attributes as well. Now, like scala reflect classes where I can also compare the data type at runtime, I was trying to do the same thing in Fink using TypeInformation where I can set some predefined format and whatever data is being read from topic should go under this Validation and should be passed or failed accordingly. I have a data like below:.
{"policyName":"String", "premium":2400, "eventTime":"2021-12-22 00:00:00" }

For my problem, I came across a couple of examples in Flink's book where it is mentioned how to create a TypeInformation variable but there was nothing mentioned on how to use it so I tried my way:

    val objectMapper = new ObjectMapper()

    val tupleType: TypeInformation[(String, String, String)] =
    Types.TUPLE[(String, Int, String)]
    println(tupleType.getTypeClass)
    
    src.map(v => v)
      .map { x =>
        val policyName: String = objectMapper.readTree(x).get("policyName").toString()
        val premium: Int = objectMapper.readTree(x).get("premium").toString().toInt
        val eventTime: String = objectMapper.readTree(x).get("eventTime").toString()
        if ((policyName, premium, eventTime)== tupleType.getTypeClass) {
          println("Good Record: " + (policyName, premium, eventTime))
        }
        else {
          println("Bad Record: " + (id, category, eventTime))
        }
      } 

Now if I pass the input as below to the flink kafka producer:

{"policyName":"whatever you feel like","premium":"4000","eventTime":"2021-12-20 00:00:00"}

It should give me the expected output as a "Bad record" and the tuple since the datatype of premium is String and not Long/Int.

If a pass the input as below:

{"policyName":"whatever you feel like","premium":4000,"eventTime":"2021-12-20 00:00:00"}

It should give me the output as "Good Record" and the tuple

But according to my code, it is always giving me the else part.

If I create a datastream variable and store the results of the above map and then compare like below then it gives me the correct result:

if (tupleType == datas.getType()) { //where 'datas' is a datastream
      print("Good Records")
    } else {
      println("Bad Records")
    }  

But I want to send the good/bad records to a different stream or maybe can directly be inserted in the Cassandra table. So, that is why I am using loops for identifying the records one by one. Is my way correct? What would be the best practice considering what I am trying to achieve?

Based on Dominik's inputs, I tried creating my ow CustomDeserializer class:

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation

import java.nio.charset.StandardCharsets

class sample extends DeserializationSchema[String] {
  override def deserialize(message: Array[Byte]): Tuple3[Int, String, String] = {
    val data = new String(message,
      StandardCharsets.UTF_8)
    val objectMapper = new ObjectMapper()
    val id: Int = objectMapper.readTree(data).get("id").toString().toInt
    val category: String = objectMapper.readTree(data).get("Category").toString()
    val eventTime: String = objectMapper.readTree(data).get("eventTime").toString()
    return (id, category, eventTime)

  }

  override def isEndOfStream(t: String): Boolean = ???

  override def getProducedType: TypeInformation[String] = return TypeInformation.of(classOf[String])
}

I wanna try to implement something like below:

src.map(v => v)
      .map { x =>
        if (new sample().deserialize(x)==true) {
          println("Good Record: " + (id, category, eventTime))
        }
        else {
          println("Bad Record: " + (id, category, eventTime))
        }
      }  

But the input is in Array[Bytes] form. So how can I implement it? Where exactly I am going wrong? What needs to be modified? This is my first ever attempt in Flink Scala custom classes.

Inputs Passed: Inputs

1
Can You elaborate a little bit on what's the expected behaviour for the described data?Dominik Wosiński
I will the check the datatype of the attributes. Like if the value for premium comes as string, i will send it to a separate stream which will be used by another downstream code and will be inserted into some NoSql dB.user17775951
Its like checking for good records and bad records. If the incoming data has the expected type of attributes data (for eg: Long/Int for Premium then consider it as good record) ,then send it to some table.user17775951
Is there a reason why You don't do this as part of JSON parsing ?Dominik Wosiński
I used Jackson's JsonNode and ObjectMapper for getting the attributes values. So, you mean while fetching these attributes, I should perform a check there itself for data types ? I am new to Flink so wanted to do it the Flink way that's why I thought of TypeInformationuser17775951

1 Answers

0
votes

I don't really think that using TypeInformation to do what You want is best idea. You can simply use something like ProcessFunction that will accept a JSON String and then use the ObjectMapper to deserialize JSON to class with the expected structure. You can output the correctly deserialized objects from the ProcessFunction and the Strings that failed deserialization can be apassed as side output since they will be Your Bad Records.

This could look like below, note that this uses Jackson scala to perform deserialization to case class. You can find more info here

case class Premium(policyName: String, premium: Long, eventTime: String)

class Splitter extends ProcessFunction[String, Premium] {
  val outputTag = new OutputTag[String]("failed")

  def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
    Try {
      lazy val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.readValue[T](json)
    } match {
      case Success(x) => Right(x)
      case Failure(err) => {
        Left(json)
      }
    }
  }
  override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
    fromJson(i) match {
      case Right(data) => collector.collect(data)
      case Left(json) => context.output(outputTag, json)
    }
  }
}

Then You can use the outputTag to get the side output data from the stream to get incorrect records.