0
votes

I'm getting the following exception running my small flink program. The app has two data streams coming from the same mocked source. It has a broadcast state. I have written this to do some performance testing but is giving me exceptions

Caused by: java.lang.UnsupportedOperationException: Cannot override partitioning for KeyedStream.
    at org.apache.flink.streaming.api.datastream.KeyedStream.setConnectionType(KeyedStream.java:251)
    at org.apache.flink.streaming.api.datastream.DataStream.broadcast(DataStream.java:429)
    at org.apache.flink.streaming.api.scala.DataStream.broadcast(DataStream.scala:495)

my code:

val testStream: DataStream[Tuple2[String, String]] = env
    .addSource(
    new MockKafkaSource
    )
    .filter(x => !x._1.equals("x"))
    .map(x => x)
    .uid("test stream 1")

val testStream2: DataStream[Tuple2[String, String]] = env
    .addSource(
    new MockKafkaSource
    )
    .map(x => x)
    .keyBy(x => x._1)
    .uid("test stream 2")

lazy val testStateDescriptor =
    new MapStateDescriptor("testState", classOf[String], classOf[Tuple2[String, String]])

val testBroadcastStream = testStream.broadcast(testStateDescriptor)

val broadcastOutStream: DataStream[Tuple2[String, String]] =
    testStream2
    .connect(testBroadcastStream)
    .process(new StateProcess)

broadcastOutStream.print()

The exception happening on this line:

val testBroadcastStream = testStream.broadcast(testStateDescriptor)
1
I assume your StateProcess function is a KeyedBroadcastProcessFunction, not a BroadcastProcessFunction, right?kkrugler
yes, it extends KeyedBroadcastProcessFunctionDenorm

1 Answers

0
votes

My issue was that I was invoking the uid method on the keyed stream for testStream2. I had to move the uid to after the map and then key the stream.