I'm following "Learning PySpark" tutorial (at this link). When I run
selector = ft.ChiSqSelector(4).fit(births_train)
s1 = births_train.map(lambda row: row.label)
s2 = selector.transform(births_train.map(lambda row: row.features))
print(s1.take(1))
print(s2.take(1))
print(type(s1))
print(type(s2))
I have this output:
[0.0]
[DenseVector([0.0, 99.0, 99.0, 999.0])]
< class 'pyspark.rdd.PipelinedRDD' >
< class 'pyspark.rdd.RDD'>
When I try to merge the results with zip, like the tutorial suggests:
s3=s1.zip(s2)
print(type(s3))
print(s3.collect())
I get this error:
< class 'pyspark.rdd.RDD'>
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () 1 s3=s1.zip(s2) 2 print(type(s3)) ----> 3 print(s3.collect())
/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/rdd.py in collect(self) 832 """ 833 with SCCallSiteSync(self.context) as css: --> 834 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 835 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 836
/content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args:
/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()
/content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 308.0 failed 1 times, most recent failure: Lost task 0.0 in stage 308.0 (TID 8596, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main process() File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream for obj in iterator: File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched for item in iterator: File "", line 1, in File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 75, in return lambda *a: f(*a) File "/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/util.py", line 55, in wrapper return f(*args, **kwargs) File "", line 9, in recode KeyError: '1'
Why?
KeyError
usually means the key doesn't exist. - Agile_Eaglezip
method documentation, they don't set a key:>>> x = sc.parallelize(range(0,5)) >>> y = sc.parallelize(range(1000, 1005)) >>> x.zip(y).collect()
gives no error. In this case, Spark sets automatically a key? Maybe is my rdd objects that have someting wrong? - dax90topFeatures_train = (births_train.map(lambda row: row.label).zip(selector.transform(births_train.map(lambda row: row.features)))).map(lambda row: reg.LabeledPoint(row[0], row[1]))
? - TDrabasbirths_train.take(1)
? - TDrabas