1
votes

I'm tring to migrate a scala spark job from hadoop cluster to GCP, I have this snippest of code that read a file and create an ArrayBuffer[String]

    import java.io._
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.fs.FSDataInputStream

    val filename="it.txt.1604607878987"
    val fs = FileSystem.get(new Configuration())
    val dataInputStream: FSDataInputStream = fs.open(new Path(filename))
    val sourceEDR=new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8")); }
    val outputEDRFile = ArrayBuffer[String]()
    buffer = new Array[Char](300)
    var num_of_chars = 0
    while (sourceEDR.read(buffer) > -1) {
        val str = new String(buffer)
        num_of_chars += str.length
        outputEDRFile += (str + "\n");}
        println(num_of_chars)
        

This code runs in the cluster and gives me 3025000 chars, I tried to run this code in dataproc:

       val path_gs = new Path("gs://my-bucket")
       val filename="it.txt.1604607878987"

       val fs = path_gs.getFileSystem(new Configuration())        
       val dataInputStream: FSDataInputStream = fs.open(new Path(filename))
       val sourceEDR =new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8")); }

       val outputEDRFile = ArrayBuffer[String]()
       buffer = new Array[Char](300)
       var num_of_chars = 0
       while (sourceEDR.read(buffer) > -1) {
       val str = new String(buffer)
       num_of_chars += str.length
       outputEDRFile += (str + "\n");}
       println(num_of_chars)

it gives 3175025 chars, I think there is whitespaces added to file contents or I must use another interface to read the file from google storage in dataproc ? Also I tried other encoding option but it give same results. Any Help ?

1
You probably need to first read Scala tutorial to understand how to work with (and not just mimic how it would be done in java, which lead to ugly and erroneous Scala) - cchantep
May you diff content of both files printed by Spark instead of comparing length? Is difference in encoding or there some missing characters at the end? - Igor Dvorzhak

1 Answers

0
votes

I don't found a solution using buffer so I tried to read char by char and it's work for me:

var i = 0
var r=0
val response = new StringBuilder
while ( ({r=sourceEDR.read(); r} != -1)) {
  val ch= r.asInstanceOf[Char]
  if(response.length < 300) { response.append(ch)}
  else {  val str = response.toString().replaceAll("[\\r\\n]", " ")
    i += str.length
    outputEDRFile += (str + "\n");
    response.setLength(0)
    response.append(ch)
  }
}
val str = response.toString().replaceAll("[\\r\\n]", " ")
i += str.length
outputEDRFile += (str + "\n");