1
votes

I've storm topology (1 worker) setup in which spout(in java) dequeues (using blpop) events from redis and transfers to bolts. but one observation is some events not received to bolt(in clojure, 6-spout threads, 50-bolt threads) when there is a queue of over 2 million and no warning/exceptions found in storm nimbus/supervisor/zookeeper/worker logs.

Locally this scenario is not replicating with dummy data. There is no network lag/packet loss is seen in cluster. avg processing latency is 100ms. How to find cause to fix it on production.

(ns event-processor
  (:import [backtype.storm StormSubmitter LocalCluster]
           java.util.UUID
           storm_jedis.RedisQueueSpout
           )
  (:use [backtype.storm clojure config])
  (:require [clojure.tools.logging :as log])
  (:require [clj-redis.client :as redis])
  (:import (redis.clients.jedis Jedis JedisPool JedisPoolConfig))
  (:gen-class))

(defmacro process-event [tuple]
    (log/info "processing event")
    )

(defbolt execute-ls-closure ["word"] {:prepare true}
  [conf context collector]
  (let [counts (atom {})]

    (bolt
     (execute [tuple]
       (let [
        timestart (. System currentTimeMillis)
        tuple-message (.get (get tuple "message") 0)
        string-to-emit (process-event tuple)
        ]
        (emit-bolt! collector [string-to-emit] :anchor tuple)
        (ack! collector tuple)
        )))))

(defn mk-topology []

  (topology
   ;{"1" (spout-spec sentence-spout)
   {"1" (spout-spec redis-spout :p 6)
                     }
   {"3" (bolt-spec {"1" :shuffle }
                   execute-ls-closure
                   :p 50)
                   }))

(defn run-local! []
  (let [cluster (LocalCluster.)]
    (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
    (Thread/sleep 10000)
    (.shutdown cluster)
    ))

(defn submit-topology! [name]
  (StormSubmitter/submitTopology
   name
   {TOPOLOGY-DEBUG true
    TOPOLOGY-WORKERS 1}
   (mk-topology)))

(defn -main
  ([]
   (run-local!))
  ([name]
   (submit-topology! name)))
1
I found on debugging that spout dequeues properly but doesn't pass event to bolt and with no exceptions/warnings at all , and topology message timeout is 1hr, pending is 5k, batchsize is 65k.al.moorthi

1 Answers

2
votes

If it doesn't slow down your topology too much, you can enable debug logging with Config.setDebug(true) https://github.com/apache/storm/blob/f2ced23fa4e3f699558663baef4ee582ee148fa2/storm-client/src/jvm/org/apache/storm/Config.java#L1763.

Otherwise, I'd try adding some debug logging to your bolts, and enabling logging for your Redis spout, to figure out if the tuples are getting lost by Storm or by the Redis integration.

Also I note that you're using an old Storm version. You could try upgrading.