3
votes

I'm getting an error when using PySpark's saveAsHadoopFile() and I get the same error when using saveAsSequenceFile(). I need to save an RDD of (key,val) where the key is a string and val is a LabeledPoint RDD (label, SparseVector). The error is shown below. Googling several sources it seems that I should be able to do this within an IPython notebook. I need to serialize this large RDD so I can process it in Java because some of Spark's MLLib functionality is not available for python yet. According to this post this should be doable.

Looking at this page I see:

_picklable_classes = [
    'LinkedList',
    'SparseVector',
    'DenseVector',
    'DenseMatrix',
    'Rating',
    'LabeledPoint',
]

so I really have no clue why I'm getting this error.

Code: labeledDataRDD.saveAsSequenceFile('/tmp/pysequencefile/')

Error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 527.0 failed 1 times, most recent failure: Lost task 0.0 in stage 527.0 (TID 1454, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)

Edit: I found this:

public class More ...ClassDictConstructor implements IObjectConstructor     {
12
13  String module;
14  String name;
15
16  public More ...ClassDictConstructor(String module, String name) {
17      this.module = module;
18      this.name = name;
19  }
20
21  public Object More ...construct(Object[] args) {
22      if (args.length > 0)
23          throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24      return new ClassDict(module, name);
25  }
26}

I'm NOT using the construct() method above directly.. so I don't know why the saveAs.. methods that I tried pass it arguments when it want none.

Edit 2: following zero323 suggestion (Thank you) worked with a small glitch. I get an error (see below) when I tried what zero323 wrote. However, when I derive a simpler RDD it works and saves this simpler one to a directory of .parquet files (breaks it up to several .parquet files). The simpler RDD is as follows:

simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")

Error when trying to save labeledDataRDD:

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
    831         raise TypeError("Can not infer schema for type: %s" % type(row))
    832 
--> 833     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    834     return StructType(fields)
    835 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
    808             return _infer_schema(obj)
    809         except TypeError:
--> 810             raise TypeError("not supported type: %s" % type(obj))
    811 
    812 

TypeError: not supported type: <type 'numpy.unicode_'>
1
What functionality isn't provided to python yet?Alberto Bonsanto
take a look here. I think they will add this in Spark 1.6. I'm using the very recent Spark 1.5.1.Kai

1 Answers

1
votes

The source of the problem is not pickling itself. If it was you wouldn't see net.razorvine.pickle.PickleException. If you take a look at the saveAsSequenceFile documentation you'll see that it requires two steps:

  1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
  2. Keys and values of this Java RDD are converted to Writables and written out.

You program fails at the first step but even if it didn't I am not exactly sure what would be an expected Java object and how to read it back.

Instead of playing with sequence files I would simply write data as a Parquet file:

from pyspark.mllib.regression import LabeledPoint

rdd = sc.parallelize([
   ("foo", LabeledPoint(1.0, [1.0, 2.0, 3.0])),
   ("bar", LabeledPoint(2.0, [4.0, 5.0, 6.0]))])

sqlContext.createDataFrame(rdd, ("k", "v")).write.parquet("a_parquet_file")

read it back and convert:

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, LabeledPoint)] = sqlContext.read.parquet("a_parquet_file")
  .select($"k", $"v.label", $"v.features")
  .map{case Row(k: String, label: Double, features: Vector) =>
    (k, LabeledPoint(label, features))}

rdd.sortBy(_._1, false).take(2)

// Array[(String, org.apache.spark.mllib.regression.LabeledPoint)] = 
//  Array((foo,(1.0,[1.0,2.0,3.0])), (bar,(2.0,[4.0,5.0,6.0])))

or if you prefer more Java-like approach:

def rowToKeyLabeledPointPair(r: Row): Tuple2[String, LabeledPoint] = {
  // Vector -> org.apache.spark.mllib.linalg.Vector
  Tuple2(r.getString(0), LabeledPoint(r.getDouble(1), r.getAs[Vector](2)))
}

sqlContext.read.parquet("a_parquet_file")
  .select($"k", $"v.label", $"v.features")
  .map(rowToKeyLabeledPointPair)

Edit

Generally speaking NumPy types are not supported as a standalone values in Spark SQL. If you have Numpy types in a RDD you have convert these to standard Python types first:

tmp = rdd.map(lambda kv: (str(kv[0]), kv[1]))
sqlContext.createDataFrame(tmp, ("k", "v")).write.parquet("a_parquet_file")