The scenario is very similar to this post with some variations: Pyspark Unsupported literal type class java.util.ArrayList
I have data of this format:
data.show()
+---------------+--------------------+--------------------+
| features| meta| telemetry|
+---------------+--------------------+--------------------+
| [seattle, 3]|[seattle, 3, 5344...|[[47, 1, 27, 92, ...|
| [miami, 1]|[miami, 1, 236881...|[[31, 84, 24, 67,...|
| [miami, 3]|[miami, 3, 02f4ca...|[[84, 5, 4, 93, 2...|
| [seattle, 3]|[seattle, 3, ec48...|[[43, 16, 94, 93,...|
| [seattle, 1]|[seattle, 1, 7d19...|[[70, 22, 45, 74,...|
|[kitty hawk, 3]|[kitty hawk, 3, d...|[[46, 15, 56, 94,...|
You can download a generated .json sample from this link: https://aiaccqualitytelcapture.blob.core.windows.net/streamanalytics/2019/08/21/10/0_43cbc7b0c9e845a187ce182b46eb4a3a_1.json?st=2019-08-22T15%3A20%3A20Z&se=2026-08-23T15%3A20%3A00Z&sp=rl&sv=2018-03-28&sr=b&sig=tsYh4oTNZXWbLnEgYypNqIsXH3BXOG8XyAH5ODi8iQg%3D
In particular, you can see that the actual data in each of these is actually a dictionary: the "features" column which is the one of interest to us is of this form: {"factory_id":"seattle","line_id":"3"}
I'm attempting to encode the data in features to one_hot via classical functional means.
See below:
def one_hot(value, categories_list):
num_cats = len(categories_list)
one_hot = np.eye(num_cats)[categories_list.index(value)]
return one_hot
def one_hot_features(row, feature_keys, u_features):
"""
feature_keys must be sorted.
"""
cur_key = feature_keys[0]
vector = one_hot(row["features"][cur_key], u_features[cur_key])
for i in range(1, len(feature_keys)):
cur_key = feature_keys[i]
n_vector = one_hot(row["features"][cur_key], u_features[cur_key])
vector = np.concatenate((vector, n_vector), axis=None)
return vector
The feature_keys and u_features in this case contain the following data:
feature_keys = ['factory_id', 'line_id']
u_features = {'factory_id': ['kitty hawk', 'miami', 'nags head', 'seattle'], 'line_id': ['1', '2', '3']}
I have created a udf and am attempting to create a new dataframe with the new column added using this udf. Code below:
def calc_onehot_udf(feature_keys, u_features):
return udf(lambda x: one_hot_features(x, feature_keys, u_features))
n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys,
u_features)( col("features") ))
n_data.show()
This results in the following sets of error:
Py4JJavaError: An error occurred while calling o148257.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 91.0 failed 4 times, most recent failure: Lost task 0.3 in stage 91.0 (TID 1404, 10.139.64.5, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/sql/types.py", line 1514, in getitem idx = self.fields.index(item) ValueError: 'features' is not in list
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 480, in main process() File "/databricks/spark/python/pyspark/worker.py", line 472, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/serializers.py", line 456, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/databricks/spark/python/pyspark/serializers.py", line 149, in dump_stream for obj in iterator: File "/databricks/spark/python/pyspark/serializers.py", line 445, in _batched for item in iterator: File "", line 1, in File "/databricks/spark/python/pyspark/worker.py", line 87, in return lambda *a: f(*a) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "", line 4, in File "", line 11, in one_hot_features File "/databricks/spark/python/pyspark/sql/types.py", line 1519, in getitem raise ValueError(item) ValueError: features
Any assistance is greatly appreciated. I am actively investigating this.
The ideal output would be a new dataframe with the column: "hot_features" which contains the 1 dimensional one hot encoded array from the features column.