We have a DStream, such as
val ssc = new StreamingContext(sc, Seconds(1))
val kS = KafkaUtils.createDirectStream[String, TMapRecord](
ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
mapPartitions(part => {
part.map(_.value())
}).
mapPartitions(part1 => {
part1.map(c => {
TMsg(1,
c.field1,
c.field2, //And others
c.startTimeSeconds
)
})
})
So each RDD has a bunch of TMsg objects with some of the (technical) key fields I can use to dediplicate DStream. Basically, if we have two TMsg objects IN ONE OR TWO DISCRETIZED RDDs with same field1 and field2, and they differ by less than 1 second (we look on startTimeSeconds), it's a duplicate.
I looked over mapWithState. Yes I can create K -> V DStream like
val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
So I can use the function but don't understand how I can use it to filter duplicates.
Window function can't help, and I can't use (structured stream).deduplicate function since solution is written in DStreams.
Any solutions? Thanks
P.S. Spark version is 2.2