0
votes

I am studying Spark on VirtualBox. I use ./bin/spark-shell to open Spark and use Scala. Now I got confused about key-value format using Scala.

I have a txt file in home/feng/spark/data, which looks like:

panda 0
pink 3
pirate 3
panda 1
pink 4

I use sc.textFile to get this txt file. If I do

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7")

Then I can use rdd.collect() to show rdd on the screen:

scala> rdd.collect()
res26: Array[String] = Array(panda 0, pink 3, pirate 3, panda 1, pink 4)

However, if I do

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7.txt")

which no ".txt" here. Then when I use rdd.collect(), I got a mistake:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/feng/spark/A.txt
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
......

But I saw other examples. All of them have ".txt" at the end. Is there something wrong with my code or my system?

Another thing is when I tried to do:

scala> val rddd = rdd.map(x => (x.split(" ")(0),x))
rddd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29
scala> rddd.collect()
res0: Array[(String, String)] = Array((panda,panda 0), (pink,pink 3), (pirate,pirate 3), (panda,panda 1), (pink,pink 4))

I intended to select the first column of the data and use it as the key. But rddd.collect() looks like not that way as the words occur twice, which is not right. I cannot keep doing the rest operations like mapbykey, reducebykey or others. Where did I do wrong?

1
Your question seems a little inconsistent with your use of ".txt". Can you check your text -- and your code inserts -- to be sure it's all correct. If it is, then your system seems truly messed up.Phasmid

1 Answers

1
votes

Just for example I create a String with your dataset, after this I split the record by line, and use SparkContext's parallelize method to create an RDD. Notice that after I create the RDD I use its map method to split the String stored in each record and convert it to a Row.

import org.apache.spark.sql.Row
val text = "panda 0\npink 3\npirate 3\npanda 1\npink 4"

val rdd = sc.parallelize(text.split("\n")).map(x => Row(x.split(" "):_*))
rdd.take(3)

The output from the take method is:

res4: Array[org.apache.spark.sql.Row] = Array([panda,0], [pink,3], [pirate,3])

About your first question, there is no need for files to have any extension. Because, in this case files are seen as plain text.