4
votes

I am comparing the Apache Beam SDK with the Flink SDK for stream processing, in order to establish the cost/advantages of using Beam as an additional framework.

I have a very simple setup where a stream of data is read from a Kafka source and processed in parallel by a cluster of nodes running Flink.

From my understanding of how these SDKs work, the simplest way to process a stream of data window by window is:

  1. Using Apache Beam (running on Flink):

    1.1. Create a Pipeline object.

    1.2. Create a PCollection of Kafka records.

    1.3. Apply windowing function.

    1.4. Transform pipeline to key by window.

    1.5. Group records by key (window).

    1.6. Apply whatever function is needed to the windowed records.

  2. Using the Flink SDK

    2.1. Create a Data Stream from a Kafka source.

    2.2. Transform it into a Keyed Stream by providing a key function.

    2.3. Apply windowing function.

    2.4. Apply whatever function is needed to the windowed records.

While the Flink solution appears programmatically more succinct, in my experience, it is less efficient at high volumes of data. I can only imagine the overhead is introduced by the key extraction function, since this step is not required by Beam.

My question is: am I comparing like for like? Are these processes not equivalent? What could explain the Beam way being more efficient, since it uses Flink as a runner (and all the other conditions are the same)?

This is the code using the Beam SDK

    PipelineOptions options = PipelineOptionsFactory.create();

    //Run with Flink
    FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
    flinkPipelineOptions.setRunner(FlinkRunner.class);
    flinkPipelineOptions.setStreaming(true);
    flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(flinkPipelineOptions);

    // Create a PCollection of Kafka records
    PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
            .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
            .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
            .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));

    //Apply Windowing Function    
    PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

    //Transform the pipeline to key by window
    PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
            windowedKafkaCollection.apply(
                    ParDo.of(
                            new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                @ProcessElement
                                public void processElement(ProcessContext context, IntervalWindow window) {
                                    context.output(KV.of(window, context.element()));
                                }
                            }));
    //Group records by key (window)
    PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
            .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());

    //Process windowed data
    PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
            .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

    // Run the pipeline.
    p.run().waitUntilFinish();

And this is the code using the Flink SDK

// Create a Streaming Execution Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(6);

//Connect to Kafka
Properties properties = new Properties();   
properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
properties.setProperty("group.id", CONSUMER_GROUP);

DataStream<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));

//Key by id
stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())

        //Set the windowing function.
        .timeWindow(Time.seconds(5L), Time.seconds(1L))

        //Process Windowed Data
        .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));

// execute program
env.execute("Using Flink SDK");

Many thanks in advance for any insight.

Edit

I thought I should add some indicators that may be relevant.

Network Received Bytes

Flink SDK

  • taskmanager.2
    • 2,644,786,446
  • taskmanager.3
    • 2,645,765,232
  • taskmanager.1
    • 2,827,676,598
  • taskmanager.6
    • 2,422,309,148
  • taskmanager.4
    • 2,428,570,491
  • taskmanager.5
    • 2,431,368,644

Beam

  • taskmanager.2
    • 4,092,154,160
  • taskmanager.3
    • 4,435,132,862
  • taskmanager.1
    • 4,766,399,314
  • taskmanager.6
    • 4,425,190,393
  • taskmanager.4
    • 4,096,576,110
  • taskmanager.5
    • 4,092,849,114

CPU Utilisation (Max)

Flink SDK

  • taskmanager.2
    • 93.00%
  • taskmanager.3
    • 92.00%
  • taskmanager.1
    • 91.00%
  • taskmanager.6
    • 90.00%
  • taskmanager.4
    • 90.00%
  • taskmanager.5
    • 92.00%

Beam

  • taskmanager.2
    • 52.0%
  • taskmanager.3
    • 71.0%
  • taskmanager.1
    • 72.0%
  • taskmanager.6
    • 40.0%
  • taskmanager.4
    • 56.0%
  • taskmanager.5
    • 26.0%

Beam seems to use a lot more networking, whereas Flink uses significantly more CPU. Could this suggest that Beam is parallelising the processing in a more efficient way?

Edit No2

I am pretty sure that the PueCalculatorFn classes are equivalent, but I will share the code here to see if any obvious discrepancies between the two processes become apparent.

Beam

public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
private transient List<IKafkaConsumption> realEnergyRecords;
private transient List<IKafkaConsumption> itEnergyRecords;

@ProcessElement
public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
    KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
    Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
    Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
    Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();

    //Calculate Pue
    IPueResult result = calculatePue(element.getKey(), records);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);

    //Return Pue keyed by Window
    c.output(KV.of(intervalWindowResult, result));
}

private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Transform the results into a stream
    Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);

    //Iterate through each reading and add to the increment count
    streamOfRecords
            .map(record -> {
                byte[] valueBytes = record.getKV().getValue();
                assert valueBytes != null;
                String valueString = new String(valueBytes);
                assert !valueString.isEmpty();
                return KV.of(record, valueString);
            }).map(kv -> {
        Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
        KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
        return KV.of(kv.getKey(), consumption);

    }).forEach(consumptionRecord -> {
                switch (consumptionRecord.getKey().getTopic()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        realEnergyRecords.add(consumptionRecord.getValue());
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        itEnergyRecords.add(consumptionRecord.getValue());
                        break;
                }
            }
    );

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }

    //Create a PueResult object to return
    IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
    return new PueResult(intervalWindow, pue.stripTrailingZeros());
}

@Override
protected void finalize() throws Throwable {
    super.finalize();
    RecordSenderFactory.closeSender();
    WindowSenderFactory.closeSender();
}
} 

Flink

public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
private transient List<KafkaConsumption> realEnergyRecords;
private transient List<KafkaConsumption> itEnergyRecords;

@Override
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
    Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
    Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
    BigDecimal pue = calculatePue(iterable);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()), itEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()));


    //Create PueResult object to return
    IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());

    //Collect result
    collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));

}

protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Iterate through each reading and add to the increment count
    StreamSupport.stream(iterable.spliterator(), false)
            .forEach(object -> {
                switch (object.get("topic").textValue()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
                        realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
                        itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                }

            });

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }
    return pue;
}

}

And here is my custom deserialiser used in the Beam example.

KafkaConsumptionDeserialiser

public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {

public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
    if(jsonElement == null) {
        return null;
    } else {
        JsonObject jsonObject = jsonElement.getAsJsonObject();
        JsonElement id = jsonObject.get("id");
        JsonElement energyConsumed = jsonObject.get("energyConsumed");
        Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
        Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
        JsonElement topic = jsonObject.get("topic");
        Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
        return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
    }
  }

}
2
What measurements have you done that lead you to think that beam may be more efficient in this case?David Anderson
I am testing the two scenarios with data streaming at different velocities, for 5 minutes. At high velocities/volume of data, the Flink SDK experiment gets severely delayed (by over 5 minutes), whereas the Beam experiment completes in almost real-time. When introducing the calculation of the Factorial of a large number (to simulate heavy processing), the differences are even more remarkable. The Beam experiment completes with a delay of around 6 minutes, whereas the Flink experiment gets stuck for over 3 hours, requiring forced termination.javalass
If I understand it correctly, the delays are to be expected. They are caused by the way Flink handles backpressure, i.e. it slows down the pipeline to the speed of the slowest processing function. Assuming that backpressure is handled in the same way in both settings, the performance drop when using the Flink SDK would most likely be caused by the way the data is windowed before processing. Perhaps the Beam way is more efficiently parallelised? It would be interesting to get some input from the Beam developers on this.javalass
I should add that I have repeated this experiment using clusters of 3 and 6 machines, and the results were pretty much the same: processing grinds to a halt and needs to be terminated in the Flink experiment, whereas the Beam experiment gets slightly delayed, but completes. The only difference is that, when the cluster size is increased, the volume/velocity at which failure occurs is higher.javalass

2 Answers

1
votes

Not sure why the Beam pipeline you wrote is faster, but semantically it is not the same as the Flink job. Similar to how windowing works in Flink, once you assign windows in Beam, all following operations automatically take the windowing into account. You don't need to group by window.

Your Beam pipeline definition can be simplified as follows:

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);

// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = ...

//Apply Windowing Function
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(
 Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = windowedKafkaCollection
    .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

// Run the pipeline.
p.run().waitUntilFinish();

As for the performance, it depends on many factors but keep in mind that Beam is an abstraction layer on top of Flink. Generally speaking, I would be surprised if you saw increased performance with Beam on Flink.

edit: Just to clarify further, you don't group on the JSON "id" field in the Beam pipeline, which you do in the Flink snippet.

0
votes

For what's worth, if the window processing can be pre-aggregated via reduce() or aggregate(), then the native Flink job should perform better than it currently does.

Many details, such as choice of state backend, serialization, checkpointing, etc. can also have a big impact on performance.

Is the same Flink being used in both cases -- i.e., same version, same configuration?