0
votes

I am new to flink i am trying write junit test cases to test KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the getDataStreamOutput method in TestUtils class and passing inputdata and patternrules to method once the input data is evaluated against list of pattern rules and if input data satisfy the condition i will get the signal and calling sink function and returning output data as string in getDataStreamOutput method

 @Test
    public void testCompareInputAndOutputDataForInputSignal() throws Exception {
        Assertions.assertEquals(sampleInputSignal,
                TestUtils.getDataStreamOutput(
                        inputSignal,
                        patternRules));
    }



public static String getDataStreamOutput(JSONObject input, Map<String, String> patternRules) throws Exception {

            env.setParallelism(1);

            DataStream<JSONObject> inputSignal = env.fromElements(input);

            DataStream<Map<String, String>> rawPatternStream =
                    env.fromElements(patternRules);

            //Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
            DataStream<Tuple2<String, Map<String, String>>> patternRuleStream =
                    rawPatternStream.flatMap(new FlatMapFunction<Map<String, String>,
                            Tuple2<String, Map<String, String>>>() {
                        @Override
                        public void flatMap(Map<String, String> patternRules,
                                            Collector<Tuple2<String, Map<String, String>>> out) throws Exception {
                            for (Map.Entry<String, String> stringEntry : patternRules.entrySet()) {
                                JSONObject jsonObject = new JSONObject(stringEntry.getValue());
                                Map<String, String> map = new HashMap<>();
                                for (String key : jsonObject.keySet()) {
                                    String value = jsonObject.get(key).toString();
                                    map.put(key, value);
                                }
                                out.collect(new Tuple2<>(stringEntry.getKey(), map));
                            }
                        }
                    });

            BroadcastStream<Tuple2<String, Map<String, String>>> patternRuleBroadcast =
                    patternStream.broadcast(patternRuleDescriptor);


            DataStream<Tuple2<String, JSONObject>> validSignal = inputSignal.map(new MapFunction<JSONObject,
                    Tuple2<String, JSONObject>>() {
                @Override
                public Tuple2<String, JSONObject> map(JSONObject inputSignal) throws Exception {
                    String source =
                            inputSignal.getSource();
                    return new Tuple2<>(source, inputSignal);
                }
            }).keyBy(0).connect(patternRuleBroadcast).process(new MyKeyedBroadCastProcessFunction());
            
            
             validSignal.map(new MapFunction<Tuple2<String, JSONObject>,
                    JSONObject>() {
                @Override
                public JSONObject map(Tuple2<String, JSONObject> inputSignal) throws Exception {
                    return inputSignal.f1;
                }
            }).addSink(new getDataStreamOutput());

            env.execute("TestFlink");
        }
        return (getDataStreamOutput.dataStreamOutput);
    }


    @SuppressWarnings("serial")
    public static final class getDataStreamOutput implements SinkFunction<JSONObject> {
        public static String dataStreamOutput;

        public void invoke(JSONObject inputSignal) throws Exception {
            dataStreamOutput = inputSignal.toString();
        }
    }

I need to test different inputs with same broadcast rules but each time i am calling this function its again and again doing process from beginning take input signal broadcast data, is there a way i can broadcast once and keeping on sending the input to the method i explored i can use CoFlatMapFunction something like below to combine datastream and keep on sending the input rules while method is running but for this one of the datastream has to keep on getting data from kafka topic again it will overburden on method to load kafka utils and server

 DataStream<JSONObject> inputSignalFromKafka = env.addSource(inputSignalKafka);

    DataStream<org.json.JSONObject> inputSignalFromMethod = env.fromElements(inputSignal));
    
    DataStream<JSONObject> inputSignal = inputSignalFromMethod.connect(inputSignalFromKafka)
                .flatMap(new SignalCoFlatMapper());


   public static class SignalCoFlatMapper
            implements CoFlatMapFunction<JSONObject, JSONObject, JSONObject> {

        @Override
        public void flatMap1(JSONObject inputValue, Collector<JSONObject> out) throws Exception {
            out.collect(inputValue);

        }

        @Override
        public void flatMap2(JSONObject kafkaValue, Collector<JSONObject> out) throws Exception {
            out.collect(kafkaValue);

        }
    }

I found a link in stackoverflow How to unit test BroadcastProcessFunction in flink when processElement depends on broadcasted data but this is confused me a lot

Any way i can only broadcast only once in Before method in test cases and keeping sending different kind of data to my broadcast function

1
Note that this is also being discussed on the mailing lists. See apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/….David Anderson

1 Answers

1
votes

You can use KeyedTwoInputStreamOperatorTestHarness in order to achieve this for example let's assume you have the following KeyedBroadcastProcessFunction where you define some business logic for both DataStream channels

public class SimpleKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String, String, String, String> {
    @Override
    public void processElement(String inputEntry,
                               ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
    //business logic for how you want to process your data stream records
    }

  @Override
    public void processBroadcastElement(String broadcastInput, Context
            context, Collector<String> collector) throws Exception {
   //process input from your broadcast channel
}

Let's now assume your process function is stateful and is making modifications to the Flink internal state, you would have to create a TestHarness inside your test class to ensure you are able to keep track of the state during testing.

I would then create some unit tests using the following approach:

public class SimpleKeyedBroadcastProcessFunctionTest {
    private SimpleKeyedBroadcastProcessFunction processFunction;
    private KeyedTwoInputStreamOperatorTestHarness<String, String, String, String> testHarness;

  @Before
  public void setup() throws Exception {
    processFunction =  new SimpleKeyedBroadcastProcessFunction();
    testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
                new CoBroadcastWithKeyedOperator<>(processFunction, ImmutableList.of(BROADCAST_MAP_STATE_DESCRIPTOR)),
                (KeySelector<String, String>) string -> string ,
                (KeySelector<String, String>) string -> string,
                TypeInformation.of(String.class));
   testHarness.setup();
   testHarness.open();
  }

  @After
    public void cleanup() throws Exception {
        testHarness.close();
    }

  @Test
  public void testProcessRegularInput() throws Exception {
      //processElement1 send elements into your regular stream, second param will be the event time of the record
      testHarness.processElement1(new StreamRecord<>("Hello", 0));
      //Access records collected during processElement  
      List<StreamRecord<? extends String>> records = testHarness.extractOutputStreamRecords();
      assertEquals("Hello", records.get(0).getValue())
  }

    @Test
  public void testProcessBroadcastInput() throws Exception {
      //processElement2 send elements into your broadcast stream, second param will be the event time of the record
      testHarness.processElement2(new StreamRecord<>("Hello from Broadcast", 0));
      //Access records collected during processElement  
      List<StreamRecord<? extends String>> records = testHarness.extractOutputStreamRecords();
      assertEquals("Hello from Broadcast", records.get(0).getValue())
  }
}