1
votes

I'm having issues with Kafka and Storm. I'm not sure at this point if it's a problem with the KafkaSpout config I am setting up, or if I am not ACKing properly or what.

I en-queued 50 items onto my Kafka Topic, but my spout has emitted over 1300 (and counting) tuples. Also, the Spout reports that almost all have "failed." The topology is actually not failing, it's writing to a database successfully, but I just don't know why it is apparently replaying everything so much (if that's what it's doing)

The big question is:

Why is it emitting so many tuples when I only passed 50 to Kafka?

enter image description here

Here is how I am setting up the topology and the KafkaSpout

  public static void main(String[] args) {
    try {
      String databaseServerIP = "";
      String kafkaZookeepers = "";
      String kafkaTopicName = "";
      int numWorkers = 1;
      int numAckers = 1;
      int numSpouts = 1;
      int numBolts = 1;
      int messageTimeOut = 10;
      String topologyName = "";

      if (args == null || args[0].isEmpty()) {
        System.out.println("Args cannot be null or empty. Exiting");
        return;
      } else {
        if (args.length == 8) {
          for (String arg : args) {
            if (arg == null) {
              System.out.println("Parameters cannot be null. Exiting");
              return;
            }
          }
          databaseServerIP = args[0];
          kafkaZookeepers = args[1];
          kafkaTopicName = args[2];
          numWorkers = Integer.valueOf(args[3]);
          numAckers = Integer.valueOf(args[4]);
          numSpouts = Integer.valueOf(args[5]);
          numBolts = Integer.valueOf(args[6]);
          topologyName = args[7];
        } else {
          System.out.println("Bad parameters: found " + args.length + ", required = 8");
          return;
        }
      }

      Config conf = new Config();

      conf.setNumWorkers(numWorkers);
      conf.setNumAckers(numAckers);
      conf.setMessageTimeoutSecs(messageTimeOut);

      conf.put("databaseServerIP", databaseServerIP);
      conf.put("kafkaZookeepers", kafkaZookeepers);
      conf.put("kafkaTopicName", kafkaTopicName);

      /**
       * Now would put kafkaSpout instance below instead of TemplateSpout()
       */
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts);
      builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout");


      StormTopology topology = builder.createTopology();

      StormSubmitter.submitTopology(topologyName, conf, topology);

    } catch (Exception e) {
      System.out.println("There was a problem starting the topology. Check parameters.");
      e.printStackTrace();
    }
  }

  private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception {

    //String topic = "FLAT-ITEMS";
    String zkNode = "/" + topic + "-subscriber-pipeline";
    String zkSpoutId = topic + "subscriberpipeline";
    KafkaTopicInZkCreator.createTopic(topic, zkHosts);


    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId);
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

    // spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
    //spoutConfig.startOffsetTime = System.currentTimeMillis();
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

    return new KafkaSpout(spoutConfig);

  }

and here is the creation of the topic in case that matters

  public static void createTopic(String topicName, String zookeeperHosts) throws Exception {
    ZkClient zkClient = null;
    ZkUtils zkUtils = null;
    try {

      int sessionTimeOutInMs = 15 * 1000; // 15 secs
      int connectionTimeOutInMs = 10 * 1000; // 10 secs

      zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

      int noOfPartitions = 1;
      int noOfReplication = 1;
      Properties topicConfiguration = new Properties();

      boolean topicExists = AdminUtils.topicExists(zkUtils, topicName);
      if (!topicExists) {
        AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
      }
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      if (zkClient != null) {
        zkClient.close();
      }
    }
  }
1

1 Answers

1
votes

you need to see if messages in the bolt failed.

If they all failed too, you probably didn't ack the message in the bolt, or there is exception in the bolt code.

If bolt messages acked, it's more likely a timeout. Increasing the topology timeout config or the paralisim should fix the problem.