1
votes

I want to run a Storm topology in localmode to test it. The project essentially takes some logs from a Kafka spout, do some pre-processing operations and then print the content out on the screen.

The following is my topology code:

import org.apache.storm.*;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.*;
import org.apache.storm.topology.*;


public class LogProcessingTopology {
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

        // zookeeper hosts for the Kafka cluster
        ZkHosts zkHosts = new ZkHosts("10.0.10.70:2181");

        // Create the KafkaSpout configuartion
        // Second argument is the topic name
        // Third argument is the zookeeper root for Kafka
        // Fourth argument is consumer group id
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "access_log", "", "0");

        // Specify that the kafka messages are String
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        // We want to consume all the first messages
        // in the topic every time we run the topology
        // to help in debugging. In production, this
        // property should be false
        //kafkaConfig.forceFromStart = true;

        // Now we create the topology
        TopologyBuilder builder = new TopologyBuilder();

        // set the kafka spout class
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);

        // set the LogSplitter, IpToCountry, Keyword,
        // and PersistenceBolt bolts
        // class.
        builder.setBolt("LogSplitter", new LogSplitterBolt(), 1)
                .globalGrouping("KafkaSpout");
        builder.setBolt("IpToCountry", new UserInformationGetterBolt("./home/clio/Desktop/GeoLiteCity.dat"), 1)
                .globalGrouping("LogSplitter");
        builder.setBolt("LogPrinter", new LogPrinterBolt(), 1).globalGrouping("LogSplitter");


            // create an instance of the LocalCluster class
            // for executing the topology in the local mode.
            LocalCluster cluster = new LocalCluster();
            Config conf = new Config();
            conf.setDebug(true);

            // Submit topology for execution
            cluster.submitTopology("KafkaTopology", conf, builder.createTopology());

            try {
                // Wait for some time before exiting
                System.out.println("**********************Waiting to consume from kafka");
                Thread.sleep(10000);

            } catch (Exception exception) {
                System.out.println("******************Thread interrupted exception : " + exception);
            }

            // kill KafkaTopology
            cluster.killTopology("KafkaTopology");

            // shut down the storm test cluster
            cluster.shutdown();

        }

}

but after running this command mvn compile exec:java -Dexec.classpathScope=compile I do not get anything relevant printed on the screen, only the following screen output:

 8110 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.nimbus - Activating KafkaTopology: KafkaTopology-1-1500410373
**********************Waiting to consume from kafka
17849 [timer] INFO  o.a.s.s.EvenScheduler - Available slots: (["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1025] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1026] ["edd1760f-f9b0-4804-813c-61744b693d31" 1027] ["edd1760f-f9b0-4804-813c-61744b693d31" 1028] ["edd1760f-f9b0-4804-813c-61744b693d31" 1029])
17868 [timer] INFO  o.a.s.d.nimbus - Setting new assignment for topology id KafkaTopology-1-1500410373: #org.apache.storm.daemon.common.Assignment{:master-code-dir "/tmp/634969ab-11d9-4385-ae59-e17d04f50e40", :node->host {"b676dc68-345b-4474-b97c-7aadbbf9a6f6" "Terminus"}, :executor->node+port {[4 4] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [3 3] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [2 2] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [1 1] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024], [5 5] ["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024]}, :executor->start-time-secs {[1 1] 1500410383, [2 2] 1500410383, [3 3] 1500410383, [4 4] 1500410383, [5 5] 1500410383}, :worker->resources {["b676dc68-345b-4474-b97c-7aadbbf9a6f6" 1024] [0.0 0.0 0.0]}}
18146 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.nimbus - Delaying event :remove for 30 secs for KafkaTopology-1-1500410373
18153 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.nimbus - Adding topo to history log: KafkaTopology-1-1500410373
18156 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.nimbus - Shutting down master
18158 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18159 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490003
18164 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490003 closed
18164 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18164 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18164 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47772 which had sessionid 0x15d576d60490003
18165 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490004
18175 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18175 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490004 closed
18175 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47774 which had sessionid 0x15d576d60490004
18176 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18176 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490001
18186 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18186 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490001 closed
18187 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47770 which had sessionid 0x15d576d60490001
18187 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.zookeeper - closing zookeeper connection of leader elector.
18188 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18188 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490000
18197 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18197 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490000 closed
18197 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47766 which had sessionid 0x15d576d60490000
18198 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.nimbus - Shut down master
18198 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18199 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490006
18208 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18208 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490006 closed
18208 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47778 which had sessionid 0x15d576d60490006
18208 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18209 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d60490008
18220 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18220 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d60490008 closed
18220 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47782 which had sessionid 0x15d576d60490008
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1024,5,com.loggingprocess.LogProcessingTopology] assignment to null
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1025,5,com.loggingprocess.LogProcessingTopology] assignment to null
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1026,5,com.loggingprocess.LogProcessingTopology] assignment to null
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1024,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1025,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1026,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY
18222 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.Supervisor - Shutting down supervisor b676dc68-345b-4474-b97c-7aadbbf9a6f6
18223 [Thread-10] INFO  o.a.s.e.EventManagerImp - Event manager interrupted
18224 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18225 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d6049000a
18231 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d6049000a closed
18231 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18232 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47786 which had sessionid 0x15d576d6049000a
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1027,5,com.loggingprocess.LogProcessingTopology] assignment to null
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1028,5,com.loggingprocess.LogProcessingTopology] assignment to null
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1029,5,com.loggingprocess.LogProcessingTopology] assignment to null
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1027,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1028,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Waiting for Thread[SLOT_1029,5,com.loggingprocess.LogProcessingTopology] to be EMPTY, currently EMPTY
18232 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.Supervisor - Shutting down supervisor edd1760f-f9b0-4804-813c-61744b693d31
18233 [Thread-14] INFO  o.a.s.e.EventManagerImp - Event manager interrupted
18233 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
18233 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d576d6049000c
18241 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
18241 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d576d6049000c closed
18242 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:47790 which had sessionid 0x15d576d6049000c
18243 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.testing - Shutting down in process zookeeper
18243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - shutting down
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.s.SessionTrackerImpl - Shutting down
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Shutting down
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.s.SyncRequestProcessor - Shutting down
18244 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - PrepRequestProcessor exited loop!
18244 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.SyncRequestProcessor - SyncRequestProcessor exited!
18244 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.s.FinalRequestProcessor - shutdown of request processor complete
18245 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.testing - Done shutting down in process zookeeper
18245 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.testing - Deleting temporary path /tmp/634969ab-11d9-4385-ae59-e17d04f50e40
18255 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.testing - Deleting temporary path /tmp/6a839f7c-1379-4112-b0bb-51138fd98f33
18255 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.testing - Deleting temporary path /tmp/8c4c315a-9689-49b6-be65-659922ede1cb
18256 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.testing - Deleting temporary path /tmp/30ba6e95-5040-4162-9a0d-ed95c4fc1778
18288 [SessionTracker] INFO  o.a.s.s.o.a.z.s.SessionTrackerImpl - SessionTrackerImpl exited loop!

At this point of the output the topology is supposed to read the Kafka content (the logs) and then printing them on the screen but that does not happen.

Finally, this is my pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.learningstorm</groupId>
    <artifactId>streaming</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <storm.version>1.1.0</storm.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <junit.version>4.12</junit.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <name>loggingprocess</name>

    <repositories>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
        <repository>
            <id>geoip</id>
            <url>http://snambi.github.com/maven/</url>
        </repository>
        <repository>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <id>central</id>
            <url>http://repo1.maven.org/maven2/</url>
        </repository>
    </repositories>




    <dependencies>

        <!-- Storm Dependencies -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!-- Storm Kafka Dependencies -->

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!-- Storm Hbase Dependencies -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>${storm.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Test -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>

        <!--Other Dependencies -->
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>22.0</version>
        </dependency>
        <dependency>
            <groupId>org.geomind</groupId>
            <artifactId>geoip</artifactId>
            <version>1.2.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-parser-combinators_2.11</artifactId>
            <version>1.0.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>3.0.2</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.loggingprocess.LogProcessingTopology</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>

    </project>

I am using Storm 1.1.0 and Kafka 0.11.0 and Zookeeper 3.4.10. My actual Zookeeper ip:port is actually a real machine to which I am referring to running a zookeeper instance. In my spout, I also reference to a real running instance of Kafka BUT even if I switch them to localhost: the same happens.

What could be the reason for this error?

Edit: In addition, If I set my topology as

builder.setBolt("LogSplitter", new LogSplitterBolt(), 1)
        .globalGrouping("KafkaSpout");
builder.setBolt("IpToCountry", new UserInformationGetterBolt("./home/clio/Desktop/GeoLiteCity.dat"), 1)
        .globalGrouping("LogSplitter");
builder.setBolt("LogPrinter", new LogPrinterBolt(), 1).globalGrouping("IpToCountry");

Then I get a proper error message:

java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.storm.utils.Utils.wrapInRuntime(Utils.java:1531) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.zookeeper.Zookeeper.getChildren(Zookeeper.java:265) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.cluster.ZKStateStorage.get_children(ZKStateStorage.java:174) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.cluster.StormClusterStateImpl.assignments(StormClusterStateImpl.java:153) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.daemon.supervisor.ReadClusterState.run(ReadClusterState.java:126) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.event.EventManagerImp$1.run(EventManagerImp.java:54) [storm-core-1.1.0.jar:1.1.0]
Caused by: java.lang.InterruptedException
    at java.lang.Object.wait(Native Method) ~[?:1.8.0_131]
    at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_131]
    at org.apache.storm.shade.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1588) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1625) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:226) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:219) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:216) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:207) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:40) ~[storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.zookeeper.Zookeeper.getChildren(Zookeeper.java:260) ~[storm-core-1.1.0.jar:1.1.0]
    ... 4 more
24506 [Thread-10] INFO  o.a.s.e.EventManagerImp - Event manager interrupted
24506 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
24507 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x15d57baaf05000a
24548 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x15d57baaf05000a closed
24548 [com.loggingprocess.LogProcessingTopology.main()-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
24549 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:51562 which had sessionid 0x15d57baaf05000a
24549 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1027,5,com.loggingprocess.LogProcessingTopology] assignment to null
24549 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1028,5,com.loggingprocess.LogProcessingTopology] assignment to null
24549 [com.loggingprocess.LogProcessingTopology.main()] INFO  o.a.s.d.s.ReadClusterState - Setting Thread[SLOT_1029,5,com.loggingprocess.LogProcessingTopology] assignment to null
1

1 Answers

2
votes

After few tests I manage to resolve the problem.

Running with following test environment :
- Windows 7 SP1
- Apache Storm 1.0.3
- Java 1.8.0_111
- Eclipse Mars.2 (4.5.2)

Example of method to run the topology on a Local Cluster:

private void runTopology(final StormTopology topology, final String topologyName, final long timeout) {
  LocalCluster localCluster = new LocalCluster();

  // topology configuration.
  final Config configuration = configureTopology();
  configuration.setDebug(true);

  // submit the topology to local cluster.
  localCluster.submitTopology(name, configuration, topology);

  if (timeout >= 0) {
      Utils.sleep(timeout);

      // kill the topology
      final KillOptions killOptions = new KillOptions();
      killOptions.set_wait_secs(0);
      localCluster.killTopologyWithOpts(name, killOptions);

      // wait until the topology is removed from the cluster
      while (topologyExists(name)) {
        // avoid cpu overuse
          Utils.sleep(1000);
      }

      // for some reason I have to wait to be sure topology is stopped and local cluster can be shutdown
      Utils.sleep(5000);
      localCluster.shutdown();

  }
}

Example of method to check if the topology is still running on the Local Cluster:

private final boolean topologyExists(final String topologyName) {

   // list all the topologies on the local cluster
   final List<TopologySummary> topologies = localCluster.getClusterInfo().get_topologies();

   // search for a topology with the topologyName
   if (null != topologies && !topologies.isEmpty()) {
       final List<TopologySummary> collect = topologies.stream()
               .filter(p -> p.get_name().equals(topologyName)).collect(Collectors.toList());
       if (null != collect && !collect.isEmpty()) {
           return true;
       }
   }
   return false;
}

I hope it will help.