1
votes

I’ve got a pipe-delimited textfile without a header, and the rows have different numbers of columns (some rows are type A with 400 columns, others type B with 200, so I need to separate them first):

val textFileRaw = sc.textFile("./data.txt")
val textFile = textFileRaw.map(line => line.split("\\|", -1))
val dataA = textFile.filter(line => line(0) == "A")
val dataB = textFile.filter(line => line(0) == "B")

Now I'd like to convert these RDD's into Spark DataFrames, but the split is returning a single array, rather than 400 or 200 distinct values. This results in the following error:

# ANames are my column names, length=400
val ANames = Array("Row ID", "City", "State", ...)
val dataADF = dataA.toDF(ANames: _*)

Name: java.lang.IllegalArgumentException
Message: requirement failed: The number of columns doesn't match.
Old column names (1): value
New column names (400): Row ID, City, State ...

This question faces the same problem, but all the answers suggest manually specifying a mapping from array to Tuple, which isn't great in the case with hundreds of columns.

I think I could get it to work if I used Spark's csv loader, but this doesn't work for my data because the rows have different number of fields (it's not a true csv file). A work-around would be to first split the files, write new files that are well-formed csv's, and then use the csv loader, but I'd like to avoid this if possible. How can I convert these RDD's into DataFrames with named columns?

1
All you have to do is put the 400 columns data at the top of the file at least one and then use sqlContext api to read the data into dataframes and change the header names and datatypes later on. Thats all I guess - Ramesh Maharjan
Some rows in the datafile are 200 different columns, however - Kyle Heuton
if you can modify the text file so that 400 columns data comes first then null will be placed where insufficient fields are read in 200 columns - Ramesh Maharjan

1 Answers

1
votes

You should be creating a schema and use SQLContext.createDataFrame api as

val dataA = textFile.filter(line => line(0) == "A")
val ANames = Array("Row ID", "City", "State", "kjl")
val dataADF = sqlContext.createDataFrame(dataA.map(Row.fromSeq(_)), StructType(ANames.map(StructField(_, StringType, true))))

It should work . But to note that I have used all data types as StringType(). You can change according to your need.