0
votes

I am new to pyspark and trying to find a way to create a udf for optimizing my dbscan operation on a huge dataset. I am going through two functions in a pipeline to find the best value of eps and min_samples. Since its computationally expensive, I am trying to see if it can be implemented in udf.

def dbscan(data, X, eps_val, min_samples_val): 
    db = DBSCAN(eps=eps_val, min_samples=min_samples_val).fit(X)
    labels = db.labels_
    no_clusters = len(np.unique(labels))
    no_noise = np.sum(np.array(labels) == -1, axis=0)
    components = db.components_
    no_datapoints = len(components)
    return no_clusters, no_noise

def run_dbscan(data, X):
    n_noise = 1
    eps_val = 0.3
    min_samples_val = 5
    i = 1 
    n_clusters, n_noise = dbscanudf(data, X, eps_val, min_samples_val)
    while n_noise > 0:
        eps_val += 0.1
        if i % 20 == 0:
            eps_val = 0.3
            min_samples_val += 1
        n_clusters, n_noise = dbscan(data, X, eps_val, min_samples_val)
        if i > 1000:
            break
        i += 1
    print("eps =", eps_val, "| min_samples =", min_samples_val, "| n_clusters =", n_clusters, "| 
    n_noise =", n_noise)
    return eps_val, min_samples_val

spark.udf.register("dbscan",dbscan)
spark.udf.register("run_dbscan",run_dbscan)
dbscanudf = udf(dbscan, ArrayType())
rundbscanudf = udf(run_dbscan, DoubleType())

Any help would be highly appreciated