1
votes

I start with the Spark philosophy and, in my case, Pyspark.

I have a small school project to do, which does not seem difficult, but I've been working on it for many days and I still cannot succeed.

I have to load images into a folder and extract the descriptorsto make a dimensional reduction.

I created a Pyspark dataframe with the image paths, and now I would like to add a column with the descriptors.

Here's how I did it.

List of image paths :

    lst_path = []

    sub_folders = os.listdir(folder)

    print(sub_folders)
    for f in sub_folders[:1]:

        lst_categ = os.listdir(folder + f)

        for file in lst_categ:

            lst_path.append(folder + f + "/" + file)

    print("Nombre d'images chargées :", len(lst_path))

    rdd = sc.parallelize(lst_path)
    row_rdd = rdd.map(lambda x: Row(x))
    df = spark.createDataFrame(row_rdd, ["path_img"])

Function to extract descriptors :

def get_desc(img):

    img = cv2.imread(file)
    orb = cv2.ORB_create(nfeatures=50)
    keypoints_orb, desc = orb.detectAndCompute(img, None)

    desc = desc.flatten()

    return desc

Function UDF:

udf_image = udf(lambda img: get_desc(img), ArrayType(FloatType()))

Creation of the new column:

df2 = df.withColumn("img_vectorized", udf_image("path_img"))

The result with printSchema():

root
|-- path_img: string (nullable = true)
|-- img_vectorized:array (nullable = true)
| |-- element: float (containsNull = true)

And when I do df2.show(), I've got the following error message :

Py4JJavaError: An error occurred while calling o773.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 93, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)

AttributeError: 'NoneType' object has no attribute 'flatten'

I note that the descriptors are null. I specify, when I do this extraction on a single line it works.

I don't understand why it doesn't work on my dataframe. Can you help me please?

Thanks.

1

1 Answers

0
votes

I found the solution last night after many days of research ...

My corrected code :

def get_desc(img):

    image = cv2.imread(img)
    orb = cv2.ORB_create(nfeatures=50)
    keypoints_orb, desc = orb.detectAndCompute(image, None)

    if desc is None:

        desc = 0
    else:
        desc = desc.flatten().tolist()

    return desc

udf_image = udf(get_desc, ArrayType(IntegerType()))

df_desc = df.withColumn("descriptors", udf_image("path_img"))

df_desc = df_desc.filter(df_desc.descriptors. isNotNull())

df_desc.show()
+--------------------+--------------------+ 
|            path_img|         descriptors|
+--------------------+--------------------+ 
|Training/Apple-Br...|[69, 113, 253, 10...| 
|Training/Apple-Br...|[212, 236, 159, 2...|
|Training/Apple-Br...|[60, 53, 123, 239...|
|Training/Apple-Br...|[255, 189, 252, 1...|
|Training/Apple-Br...|[204, 244, 149, 1...|
+--------------------+--------------------+