3
votes

I'm trying to merge two streams of the same data type. I saw the CoFlatMapfunction and tried it out, but I'm getting the following error:

"unspecified value parameters".

The code is written in Scala

val eventsTypeOne: DataStream[Option[Event]] =  patternStream1.select(pattern => selectFn1(pattern.toMap))
val eventsTypeTwo: DataStream[Option[Event]] = patternStream2.select(pattern => selectFn2(pattern.toMap))

eventsTypeOne.connect(eventsTypeTwo).flatMap(new CoFlatMapFunction[Option[Event], Option[Event], Option[Event]] {
  override def flatMap1(eventTypeOne: Option[Event], out: Collector[Option[Event]]): Unit = {
    out.collect(eventTypeOne)
  }

  override def flatMap2(eventTypeTwo: Option[Event], out: Collector[Option[Event]]): Unit = {
    out.collect(eventTypeTwo)
  }
})
}

How can I use the CoFlatMapFunction correctly? Or is there a more elegant way to merge two data streams?

Thanks in advance!

1
I guess you program is not a valid Scala program. The line in with the error occurs would be interesting. You can achieve the same functionality by using a union(). - twalthr
thanks! The union function solved my problem - ScalaNewbie
@twalthr, you should write you comment as an answer to the question ;) - diegoreico

1 Answers

4
votes

You can achieve the same functionality by using a union() operator. Union allows to merge two streams of the same type.