This question is similar to this old question which is not for Pyspark: similar
I have dataframe and want to apply an ML decision tree on it. In this dataframe, there are two categorical columns. So, I follow these steps to prepare them:
- Define StringIndexer on the two categorical columns
- Define OneHotEncoder on the result of previous step
- Define VectorAssembler on the two new columns + all numerical columns (except for label column) to build the
features
vector - Define StandardScaler on the
features
column - Apply all the defined transformers in a pipeline:
mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
pipelineModel = mem_pipiline.fit(data)
transformed = pipelineModel.transform(data)
However, I get a very big error that I don't exactly understand what it means:
Py4JJavaError Traceback (most recent call last)
<ipython-input-58-fd210f3dee7e> in <module>
2
3 mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
----> 4 pipelineModel = mem_pipiline.fit(data)
5 transformed = pipelineModel.transform(data)
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
127 return self.copy(params)._fit(dataset)
128 else:
--> 129 return self._fit(dataset)
130 else:
131 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
127 return self.copy(params)._fit(dataset)
128 else:
--> 129 return self._fit(dataset)
130 else:
131 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
319
320 def _fit(self, dataset):
--> 321 java_model = self._fit_java(dataset)
322 model = self._create_model(java_model)
323 return self._copyValues(model)
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
316 """
317 self._transfer_params_to_java()
--> 318 return self._java_obj.fit(dataset._jdf)
319
320 def _fit(self, dataset):
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o866.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 81 in stage 126.0 failed 1 times, most recent failure: Lost task 81.0 in stage 126.0 (TID 7338, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3599/0x000000084145a040: (struct<CMD_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,type_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,ts_double_VectorAssembler_439ad1dce146:double,PID_double_VectorAssembler_439ad1dce146:double,MINFLT_double_VectorAssembler_439ad1dce146:double,MAJFLT_double_VectorAssembler_439ad1dce146:double,VSTEXT_double_VectorAssembler_439ad1dce146:double,VSIZE:double,RSIZE:double,VGROW:double,RGROW:double,MEM:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:109)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
I have checked the features column before applying StandardScaler and it looks it is not a vector. Maybe that is cuasing the problem?
(430,[211,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421904E9,2770.0,2810.0,193.0,332.4,12180.0,332.4,12180.0])
(430,[254,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.554219055E9,2697.0,3324.0,47.0,508.4,11752.0,508.4,11752.0])
(430,[35,415,420,421,422,423,424,425,426,427,428],[1.0,1.0,1.55421918E9,2708.0,1595.0,1.0,35.0,352.1,5044.0,352.1,5044.0])
(430,[316,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421933E9,2932.0,947.0,24.0,264.0,2988.0,264.0,2988.0])
(430,[214,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421949E9,1245.0,404.0,25.0,20012.0,960.0,20012.0,960.0])
Any idea how I should correctly do these steps? If you need more info, please let me know. Thanks.