1
votes

I have "a.txt" which is in csv format and is separated by tabs:

16777216    16777471        -33.4940    143.2104
16777472    16778239    Fuzhou  26.0614 119.3061

Then I run:

sc.textFile("path/to/a.txt").map(line => line.split("\t")).toDF("startIP", "endIP", "City", "Longitude", "Latitude")

THen I got:

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. Old column names (1): value New column names (5): startIP, endIP, City, Longitude, Latitude at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.Dataset.toDF(Dataset.scala:376) at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:40) ... 47 elided

If I just run:

res.map(line => line.split("\t")).take(2)

I got:

rdd: Array[Array[String]] = Array(Array(16777216, 16777471, "", -33.4940, 143.2104), Array(16777472, 16778239, Fuzhou, 26.0614, 119.3061))

What is wrong here?

3

3 Answers

3
votes

As @user7881163 notes, the error occurs because your split produces a single column whose value (hence the value name given by Spark) is the array of tokens produced by the split.

However, per comments from @zero323, just make sure you use the version of collect @user7881163 uses (the one that takes a partial function) if you are operating at scale because the other, far more commonly used collect will move all your data to the driver and overwhelm that machine. And if you aren't operating at scale, why use Spark at all?

This is a slightly different approach that also allows for missing city data:

sc.textFile("path/to/a.txt")
  .map(_.split("\t"))
  .map {
      case Array(startIP, endIP, city, longitude, latitude) => (startIP, endIP, Some(city), longitude, latitude)
      case Array(startIP, endIP, longitude, latitude) => (startIP, endIP, None, longitude, latitude)
  }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")
1
votes

Try:

sc
  .textFile("path/to/a.txt")
  .map(line => line.split("\t"))
  .collect { case Array(startIP, endIP, City, Longitude, Latitude) => 
    (startIP, endIP, City, Longitude, Latitude) 
  }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")

or just use csv source:

spark.read.option("delimiter", "\t").csv("path/to/a.txt")

Your current code creates a DataFrame with a single column of type array<string>. This is why it fails when you pass 5 names.

0
votes

You can try this example:

dataDF = sc.textFile("filepath").map(x=>x.split('\t').toDF();

data = dataDF.selectExpr("_1 as startIP", "_2 as endIP", "_3 as City", "_4 as Longitude", "_5 as Latitude");