0
votes

I'm writing a dockerized Java Spring application that uses Apache Storm v1.1.2, Kafka v0.11.0.1, Zookeeper 3.4.6, Eureka, and Cloud-Config all in Docker containers orchestrated by Docker-Compose.

The tuples I'm receiving with a KafkaSpout have a "value" Field that is a protobuf object. I use a custom deserializer to get my object out of it for processing.

I have a basic application working where I have a bolt that prints incoming messages and routes them to other certain bolts based on the value of a field in the protobuf object. I also have the LocalCluster, Config, and TopologyBuilder working as Spring Beans.

Currently I set all bolts in a PostContruct but I need to be able to dynamically add bolts that filter incoming messages based on other fields of the protobuf object and perform basic aggregation functions (max/min/windowed average).

I'd like to do this with a REST Controller but how could I stop and start the topology without losing data? I also would prefer not to restart the topology by listening to the Kafka topic from the beginning as this system will receive an extremely high load.

This article looked promising but I definitely want the entire process to be automated so I won't be going into Zookeeper https://community.hortonworks.com/articles/550/unofficial-storm-and-kafka-best-practices-guide.html

How can I edit an existing topology in code to add new bolts dynamically?

1

1 Answers

1
votes

You can't. Storm topologies are static once submitted. If you need to vary processing based on a field in the tuple, your best option is to submit all the bolts you will need up front. You can then vary the path the tuple takes through the topology by using one or more bolts that examine the tuple, and emit to specific streams based on the tuple content.

e.g. make a SplitterBolt

public void execute(Tuple input) {
  if (tuple.getIntegerByField("theDecider") == 1) {
    collector.emit("onlyOnes", tuple.getValues());
  } else {
    collector.emit("others", tuple.getValues());
  }
}

where you in your topology building code would have something like

builder.setSpout("kafka-spout", ...);
builder.setBolt("splitter", new SplitterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("countOnes", new CounterBolt()).shuffleGrouping("splitter", "onlyOnes");
builder.setBolt("countOthers", new CounterBolt()).shuffleGrouping("splitter", "others");