0
votes

I want to store the data in hdfs which is emitted by Storm Spout. I have added hadoop FS API code in Bolt Class, but It is throwing compilation error with storm.

Following is the Storm bolt Class :

package bolts;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DataNormalizer extends BaseBasicBolt {

  public void execute(Tuple input, BasicOutputCollector collector) {
    String sentence = input.getString(0);
    String[] process = sentence.split(" ");
    int n = 1;
    String rec = "";
    try {
        String filepath = "/root/data/top_output.csv";
        String dest = "hdfs://localhost:9000/user/root/nishu/top_output/top_output_1.csv";

        Configuration conf = new Configuration();
        FileSystem fileSystem = FileSystem.get(conf);
        System.out.println(fileSystem);
        Path srcPath = new Path(source);
        Path dstPath = new Path(dest);
        String filename = source.substring(source.lastIndexOf('/') + 1,
                source.length());
        try {
            if (!(fileSystem.exists(dstPath))) {
                FSDataOutputStream out = fileSystem.create(dstPath, true);
                InputStream in = new BufferedInputStream(
                        new FileInputStream(new File(source)));
                byte[] b = new byte[1024];
                int numBytes = 0;
                while ((numBytes = in.read(b)) > 0) {
                    out.write(b, 0, numBytes);
                }
                in.close();
                out.close();

            } else {
                fileSystem.copyFromLocalFile(srcPath, dstPath);
            }
        } catch (Exception e) {
            System.err.println("Exception caught! :" + e);
            System.exit(1);
        } finally {
            fileSystem.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

I have added hadoop jars in CLASSPATH also.. Following is the value of classpath :

$STORM_HOME/storm-0.8.1.jar:$JAVA_HOME/lib/:$HADOOP_HOME/hadoop-core-1.0.4.jar:$HADOOP_HOME/lib/:$STORM_HOME/lib/

Also copied hadoop libraries : hadoop-cor-1.0.4.jar, commons-collection-3.2.1.jar and commons-cli-1.2.jar in Storm/lib directory.

When I am building this project, It is throwing following error :

3006 [Thread-16] ERROR backtype.storm.daemon.executor  -
java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:37)
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:34)
        at org.apache.hadoop.security.UgiInstrumentation.create(UgiInstrumentation.java:51)
        at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:216)
        at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:184)
        at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:236)
        at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:466)
        at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:452)
        at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:1494)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1395)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:123)
        at bolts.DataNormalizer.execute(DataNormalizer.java:67)
        at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:32)
       ......................
1

1 Answers

3
votes

The error message tells you that Apache commons configuration is missing. You have to add it to the classpath.

More generally, you should add all Hadoop dependencies to your classpath. You can find them using a dependency manager (Maven, Ivy, Gradle etc.) or look into /usr/lib/hadoop/lib on a machine on which Hadoop is installed.