I have an input file of the form
(id | column_name | value)
...
column_name can take on some 50 names, list of ids can be huge.
I want to build a tall-and-skinny matrix whose (i,j) coefficient corresponds to the value found at (id, column_name) with id mapped to i and column name mapped to j.
So far, here's my approach
I load the file
val f = sc.textFile("example.txt")
val data = f.map(_.split('|') match {
case Array(id, column_name, score) =>
(id.toInt, column_name.toString, score.toDouble)
}
)
Then I will build the column_name and ids lists
val column_name_list = data.map(x=>(x._2)._1).distinct.collect.zipWithIndex
val ids_list = data.map(x=>x._1).distinct.collect.zipWithIndex
val nCols = column_name_list.length
val nRows = ids_list.length
and then I will build a coordinatematrix defining the entries using the mapping I just created;
val broadcastcolumn_name = sc.broadcast(column_name_list.toMap)
val broadcastIds = sc.broadcast(ids_list.toMap)
val matrix_entries_tmp = data.map{
case(id, column_name, score) => (broadcastIds.value.getOrElse(id,0), broadcastcolumn_name.value.getOrElse(column_name,0), score)
}
val matrix_entries = matrix_entries_tmp.map{
e => MatrixEntry(e._1, e._2, e._3)
}
val coo_matrix = new CoordinateMatrix(matrix_entries)
This work fine on small examples. However, I get a memory error when the id list is getting huge. The problem seems to be:
val ids_list = data.map(x=>x._1).distinct.collect.zipWithIndex
that induces a memory error
What would be a workaround ? I actually don't really need the id mapping. What is important are the column names and that each row corresponds to some (lost) id. I was thinking about using a IndexedRowMatrix but I am stuck in how to do it.
Thanks for the help!!