1
votes

This is what my storm UI stat looks like.

The problem is that I have no idea where those numbers (of emitted tuples are coming from).

  • My topology is pretty simple: kafka spout -> bolt (persisting data into hbase)
  • topology works - when I put data into kafka topic, I get them processed by bolt and persisted in hbase, which I then verify with scan operator in hbase shell (so new records are being inserted)
  • however each time I submit new message into kafka and when it’s persisted by bolt - my topology doesn’t increase number of emitted by ‘1’.
  • periodically I get all numbers increased by 20 - without sending any new messages into kafka. I.e. my kafka topic gets no messages for hours, but the number of tuples emitted always get increased in chunks of 20 over time. I still get the same number of records in hbase.
  • I get no exceptions/errors anywhere in apache storm logs.
  • I’m not doing ack() or fail() any of my tuples in my bolt implementation (which is BasicBolt type doing ack automatically)
  • my capacity or latency in bolt metrics is always staying zero even when I load a lot of messages in Kafka
  • my kafka offset log ($KAFKA/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker) shows all the messages are processed and Kafka Lag for given topic/group is 0.

So my question:

  1. what are those ‘stealth’ tuples that increase ‘emitted’ in both Spout and Bolt over time by 20s?
  2. is it possible to enable ‘debugging’ in storm UI to see what those tuples are?
  3. why capacity/latency in bolt metrics is always zero while bolt is confirmed to persist data?

Environment details

I’m using Java 8 + Apache Storm 1.0.3

[devops@storm-wk1-prod]~/storm/supervisor/stormdist% storm version
Running: /usr/lib/jvm/jre-1.8.0-openjdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.3 -Dstorm.log.dir=/opt/apache-storm-1.0.3/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.3/lib/storm-core-1.0.3.jar:/opt/apache-storm-1.0.3/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.3/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.3/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.3/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.3/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.3/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.3/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.3/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.3/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.3/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.3/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.3/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.3/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.3/lib/storm-rename-hack-1.0.3.jar:/opt/storm/conf org.apache.storm.utils.VersionInfo
Storm 1.0.3
URL https://git-wip-us.apache.org/repos/asf/storm.git -r eac433b0beb3798c4723deb39b3c4fad446378f4
Branch (no branch)
Compiled by ptgoetz on 2017-02-07T20:22Z
From source with checksum c78e52de4b8a22d99551d45dfe9c1a4b

My storm.yaml:

I'm running 2 instances with storm supervisor, each having the following config:

storm.zookeeper.servers:
  - "10.138.0.8"
  - "10.138.0.9"
  - "10.138.0.16"

storm.zookeeper.port: 2181

nimbus.seeds: ["10.138.0.10"]

storm.local.dir: "/var/log/storm"
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"

toplogy.yaml

nimbus.host: "10.138.0.10"

# In Storm 0.7.x, this is necessary in order to give workers time to
# initialize. In Storm 0.8.0 and later, it may not be necessary because Storm
# has added a separate, longer timeout for the initial launch of a worker.
supervisor.worker.timeout.secs: 60

topology.workers: 1

topology

import tbolts
import tspouts


def create(builder):
    """Create toplogy through Petrel library
    """
    # spout getting data from kafka instance
    # we run 2 tasks of kafka spout
    builder.setSpout("kafka",
                     tspouts.KafkaSpout(), 2)

    # persistence bolt
    # we run 4 tasks of persistence bolt
    builder.setBolt("persistence",
                    tbolts.PersistenceBolt(), 4).shuffleGrouping("kafka")
1

1 Answers

1
votes

The reason your emit count jumps up by 20 is due to the fact that Storm only samples every 20th tuple buy default to update its metrics. This sampling rate is controlled by the topology.stats.sample.rate config variable and can be changed per topology. So you could set this to be 1.0 (it is 0.05 by default) and you would get an accurate emit count, however this would introduce a significant processing overhead and may cause your Acker and/or metrics consumer instances to become overloaded. Use with caution.