0
votes

I am sending streams to HDFS and trying to read text file using spark.

JavaStreamingContext jssc = new JavaStreamingContext(jsc, new    
 Duration(1000));
JavaPairInputDStream<LongWritable, Text> textStream =   
jssc.fileStream("hdfs://myip:9000/travel/FlumeData.[0-9]*", 
LongWritable.class, Text.class, TextInputFormat.class);

while sending streams to hdfs some FlumeData.1234.tmp files are created and once full data is received this file is converted into proper file eg. FlumeData.1234

I want to ignore this .tmp files to be read.from spark. I tried using regex

hdfs://myip:9000/travel/FlumeData.[0-9]* hdfs://myip:9000/travel/FlumeData.//d*

but they are not working. I am looking for something like this jssc.fileStream("hdfs://myip:9000/travel/FlumeData.[0-9]*", LongWritable.class, Text.class, TextInputFormat.class);

fileStream should not read .tmp from file extension.

I also tried following Hadoop code to retrieve list of flies

private  String pathValue(String PathVariable) throws IOException{



      Configuration conf = new Configuration();
      Path path = new Path(PathVariable);
      FileSystem fs = FileSystem.get(path.toUri(), conf);
      System.out.println("PathVariable" + fs.getWorkingDirectory());

      return fs.getName();
   } 

but it FileSystem object fs dont have filename(). Since new files are created at run time. I need to read as they created.

2

2 Answers

0
votes

JavaPairInputDStream overloaded fileStream method takes filter function and we can write a filter function to filter out the files in the directory.

fileStream(directory, kClass, vClass, fClass, filter, newFilesOnly)

JavaPairInputDStream<LongWritable, Text> lines = jssc.fileStream("hdfs://myip:9000/travel/", LongWritable.class, Text.class, TextInputFormat.class, new Function<Path,Boolean> () {
        public Boolean call(Path path) throws Exception {
            System.out.println("Is path :"+path.getName());
            Pattern pattern =  Pattern.compile("FlumeData.[0-9]*");
            Matcher m = pattern.matcher(path.getName());
            System.out.println("Is path : " + path.getName().toString() + " matching "
                + " ? , " + m.matches());
            return  m.matches();
        }}, true);

Please run using the above code and i hope that will resolve the issue.

0
votes

You need to use the () selector to select the part that you can to keep from the match. If you no specify any part, the whole match is returned.

In your case, if I am not missunderstanding you want select in your example:

FlumeData.1234 from FlumeData.1234.tmp 

To do this, the simple regex you need is:

(.*).tmp

if you want to select everything before .tmp extension.