0
votes

This question is similar to this old question which is not for Pyspark: similar

I have dataframe and want to apply an ML decision tree on it. In this dataframe, there are two categorical columns. So, I follow these steps to prepare them:

  • Define StringIndexer on the two categorical columns
  • Define OneHotEncoder on the result of previous step
  • Define VectorAssembler on the two new columns + all numerical columns (except for label column) to build the features vector
  • Define StandardScaler on the features column
  • Apply all the defined transformers in a pipeline:
mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
pipelineModel = mem_pipiline.fit(data)
transformed = pipelineModel.transform(data)

However, I get a very big error that I don't exactly understand what it means:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-58-fd210f3dee7e> in <module>
      2 
      3 mem_pipiline = Pipeline(stages = [indexer, encoder, assembler, scaler])
----> 4 pipelineModel = mem_pipiline.fit(data)
      5 transformed = pipelineModel.transform(data)

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
    127                 return self.copy(params)._fit(dataset)
    128             else:
--> 129                 return self._fit(dataset)
    130         else:
    131             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
    127                 return self.copy(params)._fit(dataset)
    128             else:
--> 129                 return self._fit(dataset)
    130         else:
    131             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    319 
    320     def _fit(self, dataset):
--> 321         java_model = self._fit_java(dataset)
    322         model = self._create_model(java_model)
    323         return self._copyValues(model)

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    316         """
    317         self._transfer_params_to_java()
--> 318         return self._java_obj.fit(dataset._jdf)
    319 
    320     def _fit(self, dataset):

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

/mnt/d/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o866.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 81 in stage 126.0 failed 1 times, most recent failure: Lost task 81.0 in stage 126.0 (TID 7338, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3599/0x000000084145a040: (struct<CMD_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,type_index_encode:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,ts_double_VectorAssembler_439ad1dce146:double,PID_double_VectorAssembler_439ad1dce146:double,MINFLT_double_VectorAssembler_439ad1dce146:double,MAJFLT_double_VectorAssembler_439ad1dce146:double,VSTEXT_double_VectorAssembler_439ad1dce146:double,VSIZE:double,RSIZE:double,VGROW:double,RGROW:double,MEM:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:109)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)

I have checked the features column before applying StandardScaler and it looks it is not a vector. Maybe that is cuasing the problem?

(430,[211,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421904E9,2770.0,2810.0,193.0,332.4,12180.0,332.4,12180.0])
(430,[254,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.554219055E9,2697.0,3324.0,47.0,508.4,11752.0,508.4,11752.0])
(430,[35,415,420,421,422,423,424,425,426,427,428],[1.0,1.0,1.55421918E9,2708.0,1595.0,1.0,35.0,352.1,5044.0,352.1,5044.0])
(430,[316,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421933E9,2932.0,947.0,24.0,264.0,2988.0,264.0,2988.0])
(430,[214,415,420,421,422,424,425,426,427,428],[1.0,1.0,1.55421949E9,1245.0,404.0,25.0,20012.0,960.0,20012.0,960.0])   

Any idea how I should correctly do these steps? If you need more info, please let me know. Thanks.

1
that is exactly the cause, try convert struct type(array type in dataframe api) to vector typeE.ZY.

1 Answers

0
votes

I actually found the problem. It was related to existence of some null values in the original dataframe.