1
votes

I have 20TB of data. I tried to convert spark dataframe to spark matrix as follows (Solution used found here): My dataframe looks like this:

+-------+---------------+--------------------+
|goodsID|customer_group|customer_phone_number|
+-------+---------------+--------------------+
|    123|          XXXXX|            XXXXXXXX|
|    432|          YYYYY|            XXXXXXXX|
+-------+---------------+--------------------+

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

mat = IndexedRowMatrix(mydataframe.map(lambda row: IndexedRow(*row)))
mat.numRows()
mat.numCols()

but it gives me following error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/test/spark-1.6.0-bin-hadoop2.6/python/pyspark/rdd.py", line 1293, in takeUpToNumLeft
    yield next(iterator)
  File "<stdin>", line 1, in <lambda>
TypeError: __init__() takes exactly 3 arguments (4 given)

So my question is

  1. How can I achieve this in spark?
  2. Also how can I convert my dataframe to numpy array?
  3. Is using pandas with spark is really bad?
1

1 Answers

1
votes
  • Types of the input data are probably wrong. Vector values have to be Double (Python float).

  • You don't use IndexedRow the right way. It takes two arguments - index, and vector. If we assume that data is of correct type

    mat = IndexedRowMatrix(mydataframe.map(
     lambda row: IndexedRow(row[0], Vectors.dense(row[1:]))))
    
  • Is Pandas bad? For 20TB of data? Not the best choice but there exist distributed Python libraries with similar API.