16
votes

I am trying to process 4 directories of text files that keep growing every day. What I need to do is, if somebody is trying to search for an invoice number, I should give them the list of files which has it.

I was able to map and reduce the values in text files by loading them as RDD. But how can I obtain the file name and other file attributes?

8

8 Answers

33
votes

Since Spark 1.6 you can combine text data source and input_file_name function as follows:

Scala:

import org.apache.spark.sql.functions.input_file_name

val inputPath: String = ???

spark.read.text(inputPath)
  .select(input_file_name, $"value")
  .as[(String, String)] // Optionally convert to Dataset
  .rdd // or RDD

Python:

(Versions before 2.x are buggy and may not preserve names when converted to RDD):

from pyspark.sql.functions import input_file_name

(spark.read.text(input_path)
    .select(input_file_name(), "value"))
    .rdd)

This can be used with other input formats as well.

5
votes

You can try this if you are in pyspark:

    test = sc.wholeTextFiles("pathtofile")

you will get a resulting RDD with first element = filepath and second element = content

4
votes

If your text files are small enough, you can use SparkContext.wholeTextFiles which returns an RDD of (filename,content).

3
votes

If your text files are too large for SparkContext.wholeTextFiles, you would use a (simple) custom InputFormat and then call SparkContext.hadoopRDD

The InputFormat would need to return a tuple (filename, line) rather than line then you could filter using a predicate that looks at the content of the line, then unique it and collect the filenames.

From Spark, the code would look something like:

val ft = classOf[FileNamerInputFormat]
val kt = classOf[String]
val vt = classOf[String]

val hadoopConfig = new Configuration(sc.hadoopConfiguration)
sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig)
  .filter { case (f, l) => isInteresting(l) }
  .map { case (f, _) => f } 
  .distinct()
  .collect()
3
votes

You can use WholeTextFile() to achieve this. However if the input files are big then it would be counter productive to use WholeTextFile() since it put whole file content into a single record.

The best way to retrieve files names in such scenario is to use mapPartitionsWithInputSplit(). You can find a working example using this scenario on my blog.

2
votes

If you're using dataframe API, you can get files names from HDFS using input_file_name function from org.apache.spark.sql.functions. Below snippets might help you understand.

val df = spark.read.csv("/files/")
val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType)) 
val df3 = df.withColumn("file_name", input_file_name()) 

df2 now includes new field called "file_name" that contains HDFS filename extracted using split function. If you need full HDFS path, you can get using input_file_name() function only as shown in df3.

1
votes

It seems overkill to use Spark directly ... If this data is going to be 'collected' to the driver, why not use the HDFS API? Often Hadoop is bundled with Spark. Here is an example:

import org.apache.hadoop.fs._
import org.apache.hadoop.conf._

val fileSpec = "/data/Invoices/20171123/21"
val conf = new Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf)
val path = new Path(fileSpec)
// if(fs.exists(path) && fs.isDirectory(path) == true) ...
val fileList = fs.listStatus(path)

Then with println(fileList(0)), info (formatted) like this first item (as an example) can be seen as org.apache.hadoop.fs.FileStatus:

FileStatus {
    path=hdfs://nameNodeEneteredHere/Invoices-0001.avro; 
    isDirectory=false; 
    length=29665563;
    replication=3;
    blocksize=134217728;
    modification_time=1511810355666;
    access_time=1511838291440;
    owner=codeaperature;
    group=supergroup;
    permission=rw-r--r--;
    isSymlink=false
}

Where fileList(0).getPath will give hdfs://nameNodeEneteredHere/Invoices-0001.avro.

I guess this means of reading files would primarily be with the HDFS namenode and not within each executor. TLDR; I'm betting Spark would likely poll the namenode to get RDDs. If the underlying Spark call polls the namenode to manage the RDDs, perhaps the above is an efficient solution. Still, contributive comments suggesting either direction would be welcome.

0
votes

If you don't know the schema of each JSON (and it can be different) you can use :

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
 
# ... here you get your DF

# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))

From : https://github.com/apache/spark/pull/22775