I start with the Spark philosophy and, in my case, Pyspark.
I have a small school project to do, which does not seem difficult, but I've been working on it for many days and I still cannot succeed.
I have to load images into a folder and extract the descriptorsto make a dimensional reduction.
I created a Pyspark dataframe with the image paths, and now I would like to add a column with the descriptors.
Here's how I did it.
List of image paths :
lst_path = []
sub_folders = os.listdir(folder)
print(sub_folders)
for f in sub_folders[:1]:
lst_categ = os.listdir(folder + f)
for file in lst_categ:
lst_path.append(folder + f + "/" + file)
print("Nombre d'images chargées :", len(lst_path))
rdd = sc.parallelize(lst_path)
row_rdd = rdd.map(lambda x: Row(x))
df = spark.createDataFrame(row_rdd, ["path_img"])
Function to extract descriptors :
def get_desc(img):
img = cv2.imread(file)
orb = cv2.ORB_create(nfeatures=50)
keypoints_orb, desc = orb.detectAndCompute(img, None)
desc = desc.flatten()
return desc
Function UDF:
udf_image = udf(lambda img: get_desc(img), ArrayType(FloatType()))
Creation of the new column:
df2 = df.withColumn("img_vectorized", udf_image("path_img"))
The result with printSchema():
root
|-- path_img: string (nullable = true)
|-- img_vectorized:array (nullable = true)
| |-- element: float (containsNull = true)
And when I do df2.show(), I've got the following error message :
Py4JJavaError: An error occurred while calling o773.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 93, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)
AttributeError: 'NoneType' object has no attribute 'flatten'
I note that the descriptors are null. I specify, when I do this extraction on a single line it works.
I don't understand why it doesn't work on my dataframe. Can you help me please?
Thanks.