1
votes

I have a cluster of 9 computers with Apache Hadoop 2.7.2 and Spark 2.0.0 installed on them. Each computer runs an HDFS datanode and Spark slave. One of these computers also runs an HDFS namenode and Spark master.

I've uploaded a few TBs of gz-archives in HDFS with Replication=2. It turned out that some of the archives are corrupt. I'd want to find them. It looks like 'gunzip -t ' can help. So I'm trying to find a way to run a Spark application on the cluster so that each Spark executor tests archives 'local' (i.e. having one of the replicas located on the same computer where this executor runs) to it as long as it is possible. The following script runs but sometimes Spark executors process 'remote' files in HDFS:

// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchivesTester' in spark.pom
// and placing this jar file into Spark's home directory):
//    ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9
// means testing for corruption the gz-archives in the directory hdfs://LV-WS10.lviv:9000/commoncrawl
// using a Spark cluster with the Spark master URL spark://LV-WS10.lviv:7077 and 9 Spark slaves

package com.qbeats.cortex

import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.FileSplit
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkContext, SparkConf, AccumulatorParam}
import sys.process._

object CommoncrawlArchivesTester extends App {
  object LogAccumulator extends AccumulatorParam[String] {
    def zero(initialValue: String): String = ""
    def addInPlace(log1: String, log2: String) = if (log1.isEmpty) log2 else log1 + "\n" + log2
  }

  override def main(args: Array[String]): Unit = {
    if (args.length >= 3) {
      val appName = "CommoncrawlArchivesTester"
      val conf = new SparkConf().setAppName(appName).setMaster(args(0))
      conf.set("spark.executor.memory", "6g")
      conf.set("spark.shuffle.service.enabled", "true")
      conf.set("spark.dynamicAllocation.enabled", "true")
      conf.set("spark.dynamicAllocation.initialExecutors", args(2))
      val sc = new SparkContext(conf)

      val log = sc.accumulator(LogAccumulator.zero(""))(LogAccumulator)

      val text = sc.hadoopFile(args(1), classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
      val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]

      val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
        val fileName = inputSplit.asInstanceOf[FileSplit].getPath.toString
        class FilePath extends Iterable[String] {
          def iterator = List(fileName).iterator
        }
        val result = (sys.env("HADOOP_PREFIX") + "/bin/hadoop fs -cat " + fileName) #| "gunzip -t" !

        println("Processed %s.".format(fileName))
        if (result != 0) {
          log.add(fileName)
          println("Corrupt: %s.".format(fileName))
        }
        (new FilePath).iterator
      }

      val result = fileAndLine.collect()

      println("Corrupted files:")
      println(log.value)
    }
  }
}

What would you suggest?


ADDED LATER:

I tried another script which gets files from HDFS via textFile(). I looks like a Spark executor doesn't prefer among input files the files which are 'local' to it. Doesn't it contradict to "Spark brings code to data, not data to code"?

// Usage (after packaging a jar with mainClass set to 'com.qbeats.cortex.CommoncrawlArchiveLinesCounter' in spark.pom)
//   ./bin/spark-submit --master spark://LV-WS10.lviv:7077 spark-cortex-fat.jar spark://LV-WS10.lviv:7077 hdfs://LV-WS10.lviv:9000/commoncrawl 9

package com.qbeats.cortex

import org.apache.spark.{SparkContext, SparkConf}

object CommoncrawlArchiveLinesCounter extends App {
  override def main(args: Array[String]): Unit = {
    if  (args.length >= 3) {
      val appName = "CommoncrawlArchiveLinesCounter"
      val conf = new SparkConf().setAppName(appName).setMaster(args(0))
      conf.set("spark.executor.memory", "6g")
      conf.set("spark.shuffle.service.enabled", "true")
      conf.set("spark.dynamicAllocation.enabled", "true")
      conf.set("spark.dynamicAllocation.initialExecutors", args(2))
      val sc = new SparkContext(conf)

      val helper = new Helper
      val nLines = sc.
        textFile(args(1) + "/*").
        mapPartitionsWithIndex( (index, it) => {
          println("Processing partition %s".format(index))
          it
        }).
        count
      println(nLines)
    }
  }
}

SAIF C, could you explain in more detail please?

1
Spark on Hadoop probably cannot see your local versions of the file. You would probably need to add them using the --files option in spark submit. When using this the local files will be accessible by spark at the root of the executors.Saif Charaniya

1 Answers

0
votes

I've solved the problem by switching from Spark’s standalone mode to YARN.

Related topic: How does Apache Spark know about HDFS data nodes?