2
votes

I have an input file of the form

(id | column_name | value)
...

column_name can take on some 50 names, list of ids can be huge.

I want to build a tall-and-skinny matrix whose (i,j) coefficient corresponds to the value found at (id, column_name) with id mapped to i and column name mapped to j.

So far, here's my approach

I load the file

val f = sc.textFile("example.txt") 
    val data = f.map(_.split('|') match {
        case Array(id, column_name, score) =>
        (id.toInt, column_name.toString, score.toDouble)
        }
    )

Then I will build the column_name and ids lists

    val column_name_list = data.map(x=>(x._2)._1).distinct.collect.zipWithIndex
    val ids_list = data.map(x=>x._1).distinct.collect.zipWithIndex
    val nCols = column_name_list.length
    val nRows = ids_list.length

and then I will build a coordinatematrix defining the entries using the mapping I just created;

    val broadcastcolumn_name = sc.broadcast(column_name_list.toMap)
    val broadcastIds = sc.broadcast(ids_list.toMap)

    val matrix_entries_tmp = data.map{
            case(id, column_name, score) => (broadcastIds.value.getOrElse(id,0), broadcastcolumn_name.value.getOrElse(column_name,0), score)
    }   

    val matrix_entries = matrix_entries_tmp.map{
            e => MatrixEntry(e._1, e._2, e._3)
    }   

    val coo_matrix = new CoordinateMatrix(matrix_entries)

This work fine on small examples. However, I get a memory error when the id list is getting huge. The problem seems to be:

val ids_list = data.map(x=>x._1).distinct.collect.zipWithIndex

that induces a memory error

What would be a workaround ? I actually don't really need the id mapping. What is important are the column names and that each row corresponds to some (lost) id. I was thinking about using a IndexedRowMatrix but I am stuck in how to do it.

Thanks for the help!!

1

1 Answers

1
votes

CoordinateMatrix

Too ugly to be a decent solution but it should give you some place to start.

First lets create a mapping between column name and index:

val colIdxMap = sc.broadcast(data.
     map({ case (row, col, value) => col }).
     distinct.
     zipWithIndex.
     collectAsMap)

Group columns by row id and map values to pairs (colIdx, value):

val values = data.
    groupBy({ case (row, col, value) => row }).
    mapValues({ _.map { case (_, col, value) => (colIdxMap.value(col), value)}}).
    values

Generate entries:

val entries = values.
     zipWithIndex.
     flatMap { case (vals, row) =>
         vals.map {case (col, value) => MatrixEntry(row, col, value)}
     }

Create a final matrix:

val mat: CoordinateMatrix = new CoordinateMatrix(entries)

RowMatrix

If row ids are not important at all you can use a RowMatrix as follows:

First lets group data by row

val dataByRow = data.groupBy { case (row, col, value) => row }

Generate sparse vector for each row:

val rows = dataByRow.mapValues((vals) => {
    val cols = vals.map {
        case (_, col, value) => (colIdxMap.value(col).toInt, value)
    }
    Vectors.sparse(colIdxMap.value.size, cols.toSeq)
}).values

Create a matrix:

val mat: RowMatrix = new RowMatrix(rows)

You can use zipWithIndex on rows to create a RDD[IndexedRow] and IndexedRowMatrix as well.