0
votes

I need to read a 10GB fixed width file to a dataframe. How can I do it using Spark in R?

Suppose my text data is the following:

text <- c("0001BRAjonh   ",
"0002USAmarina ",
"0003GBPcharles")

I want the 4 first characters to be associated to the column "ID" of a data frame; from character 5-7 would be associated to a column "Country"; and from character 8-14 to be associated to a column "Name"

I would use function read.fwf if the dataset was small, but that is not the case.

I can read the file as a text file using sparklyr::spark_read_text function. But I don't know how to attribute the values of the file to a data frame properly.

1
I built a class to do this for me in Scala based off of substring and selectExpr. To start I had all my schemas in text files set under an external Hive Table with five columns: table name, column number, column name, column start, column end. Each column was transformed into a respective array, having a while building each columns parsing statement. Even though it will be in Scala want me to draft up a proxy Spark answer for it within the answer section? - afeldman
Definitely yes, @afeldman. It will certainly help me to figure out how to do it in R. Thanks for this. - Charles Santana

1 Answers

-1
votes

EDIT: Forgot to say substring starts at 1 and array starts at 0, because reasons.

Going through and adding the code I talked about in the column above.

The process is dynamic and is based off a Hive table called Input_Table. The table has 5 columns: Table_Name, Column_Name, Column_Ordinal_Position, Column_Start, and Column_Length. It is external so any user can change, drop, and remove any file into the folder location. I quickly built this from scratch to not take actual code, does everything make sense?

#Call Input DataFrame and the Hive Table. For hive table we make sure to only take correct column as well as the columns in correct order.
val inputDF       = spark.read.format(recordFormat).option("header","false").load(folderLocation + "/" + tableName + "." + tableFormat).rdd.toDF("Odd_Long_Name")
val inputSchemaDF = spark.sql("select * from Input_Table where Table_Name = '" + tableName + "'").sort($"Column_Ordinal_Position")

#Build all the arrays from the columns, rdd to map to collect changes a dataframe col to a array of strings. In this format I can iterator through the column.
val columnNameArray    = inputSchemaDF.selectExpr("Column_Name").rdd.map(x=>x.mkString).collect
val columnStartArray   = inputSchemaDF.selectExpr("Column_Start_Position").rdd.map(x=>x.mkString).collect
val columnLengthArray  = inputSchemaDF.selectExpr("Column_Length").rdd.map(x=>x.mkString).collect

#Make the iteraros as well as other variables that are meant to be overwritten
var columnAllocationIterator = 1
var localCommand             = ""
var commandArray             = Array("") 

#Loop as there are as many columns in input table
while (columnAllocationIterator <= columnNameArray.length) {
  #overwrite the string command with the new command, thought odd long name was too accurate to not place into the code
  localCommand = "substring(Odd_Long_Name, " + columnStartArray(columnAllocationIterator-1) + ", " + columnLengthArray(columnAllocationIterator-1) + ") as " + columnNameArray(columnAllocationIterator-1) 

  #If the code is running the first time it overwrites the command array, else it just appends
  if (columnAllocationIterator==1) {
    commandArray = Array(localCommand)
  } else {
    commandArray = commandArray ++ Array(localCommand)
  }

  #I really like iterating my iterators like this
  columnAllocationIterator = columnAllocationIterator + 1
}

#Run all elements of the string array indepently against the table
val finalDF = inputDF.selectExpr(commandArray:_*)