1
votes

I'm following "Learning PySpark" tutorial (at this link). When I run

selector = ft.ChiSqSelector(4).fit(births_train)

s1 = births_train.map(lambda row: row.label)
s2 = selector.transform(births_train.map(lambda row: row.features))

print(s1.take(1))
print(s2.take(1))
print(type(s1))
print(type(s2))

I have this output:

[0.0]

[DenseVector([0.0, 99.0, 99.0, 999.0])]

< class 'pyspark.rdd.PipelinedRDD' >

< class 'pyspark.rdd.RDD'>

When I try to merge the results with zip, like the tutorial suggests:

s3=s1.zip(s2)
print(type(s3))
print(s3.collect())

I get this error:

< class 'pyspark.rdd.RDD'>

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () 1 s3=s1.zip(s2) 2 print(type(s3)) ----> 3 print(s3.collect())

/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/rdd.py in collect(self) 832 """ 833 with SCCallSiteSync(self.context) as css: --> 834 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 835 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 836

/content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args:

/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()

/content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 308.0 failed 1 times, most recent failure: Lost task 0.0 in stage 308.0 (TID 8596, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main process() File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream for obj in iterator: File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched for item in iterator: File "", line 1, in File "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 75, in return lambda *a: f(*a) File "/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/util.py", line 55, in wrapper return f(*args, **kwargs) File "", line 9, in recode KeyError: '1'

Why?

1
A KeyError usually means the key doesn't exist. - Agile_Eagle
I reproduced all the code in the link: they don't set keys. In the simple zip method documentation, they don't set a key: >>> x = sc.parallelize(range(0,5)) >>> y = sc.parallelize(range(1000, 1005)) >>> x.zip(y).collect() gives no error. In this case, Spark sets automatically a key? Maybe is my rdd objects that have someting wrong? - dax90
If you do this in one call -- do you get an error as well? I mean, if you were to do this topFeatures_train = (births_train.map(lambda row: row.label).zip(selector.transform(births_train.map(lambda row: row.features)))).map(lambda row: reg.LabeledPoint(row[0], row[1]))? - TDrabas
When I run your command, It doesn’t give me any error. Anyway It does when I run topFeatures_train.take(1) - dax90
I start to wonder if this is Spark 2.3.1 specific issue. I tested my code in 2.3.0 and it works fine. Would you have means to run it in 2.3.0? What is the output if you run births_train.take(1)? - TDrabas

1 Answers

0
votes

The following code works for me, although I don't know why:

truth = sc.parallelize(births_test.map(lambda row: row.label).collect())
prediction = sc.parallelize(LR_Model.predict(births_test.map(lambda row: row.features)).map(lambda x: x * 1.0).collect())
LR_results = truth.zip(prediction)