I'm getting the following exception running my small flink program. The app has two data streams coming from the same mocked source. It has a broadcast state. I have written this to do some performance testing but is giving me exceptions
Caused by: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream.
at org.apache.flink.streaming.api.datastream.KeyedStream.setConnectionType(KeyedStream.java:251)
at org.apache.flink.streaming.api.datastream.DataStream.broadcast(DataStream.java:429)
at org.apache.flink.streaming.api.scala.DataStream.broadcast(DataStream.scala:495)
my code:
val testStream: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.filter(x => !x._1.equals("x"))
.map(x => x)
.uid("test stream 1")
val testStream2: DataStream[Tuple2[String, String]] = env
.addSource(
new MockKafkaSource
)
.map(x => x)
.keyBy(x => x._1)
.uid("test stream 2")
lazy val testStateDescriptor =
new MapStateDescriptor("testState", classOf[String], classOf[Tuple2[String, String]])
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
val broadcastOutStream: DataStream[Tuple2[String, String]] =
testStream2
.connect(testBroadcastStream)
.process(new StateProcess)
broadcastOutStream.print()
The exception happening on this line:
val testBroadcastStream = testStream.broadcast(testStateDescriptor)
StateProcess
function is aKeyedBroadcastProcessFunction
, not aBroadcastProcessFunction
, right? – kkrugler