I'm getting an error when using PySpark's saveAsHadoopFile() and I get the same error when using saveAsSequenceFile(). I need to save an RDD of (key,val) where the key is a string and val is a LabeledPoint RDD (label, SparseVector). The error is shown below. Googling several sources it seems that I should be able to do this within an IPython notebook. I need to serialize this large RDD so I can process it in Java because some of Spark's MLLib functionality is not available for python yet. According to this post this should be doable.
Looking at this page I see:
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
so I really have no clue why I'm getting this error.
Code: labeledDataRDD.saveAsSequenceFile('/tmp/pysequencefile/')
Error:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 527.0 failed 1 times, most recent failure: Lost task 0.0 in stage 527.0 (TID 1454, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
Edit: I found this:
public class More ...ClassDictConstructor implements IObjectConstructor {
12
13 String module;
14 String name;
15
16 public More ...ClassDictConstructor(String module, String name) {
17 this.module = module;
18 this.name = name;
19 }
20
21 public Object More ...construct(Object[] args) {
22 if (args.length > 0)
23 throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24 return new ClassDict(module, name);
25 }
26}
I'm NOT using the construct() method above directly.. so I don't know why the saveAs.. methods that I tried pass it arguments when it want none.
Edit 2: following zero323 suggestion (Thank you) worked with a small glitch. I get an error (see below) when I tried what zero323 wrote. However, when I derive a simpler RDD it works and saves this simpler one to a directory of .parquet files (breaks it up to several .parquet files). The simpler RDD is as follows:
simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")
Error when trying to save labeledDataRDD:
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
831 raise TypeError("Can not infer schema for type: %s" % type(row))
832
--> 833 fields = [StructField(k, _infer_type(v), True) for k, v in items]
834 return StructType(fields)
835
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
808 return _infer_schema(obj)
809 except TypeError:
--> 810 raise TypeError("not supported type: %s" % type(obj))
811
812
TypeError: not supported type: <type 'numpy.unicode_'>