I am new to pyspark and trying to find a way to create a udf for optimizing my dbscan operation on a huge dataset. I am going through two functions in a pipeline to find the best value of eps and min_samples. Since its computationally expensive, I am trying to see if it can be implemented in udf.
def dbscan(data, X, eps_val, min_samples_val):
db = DBSCAN(eps=eps_val, min_samples=min_samples_val).fit(X)
labels = db.labels_
no_clusters = len(np.unique(labels))
no_noise = np.sum(np.array(labels) == -1, axis=0)
components = db.components_
no_datapoints = len(components)
return no_clusters, no_noise
def run_dbscan(data, X):
n_noise = 1
eps_val = 0.3
min_samples_val = 5
i = 1
n_clusters, n_noise = dbscanudf(data, X, eps_val, min_samples_val)
while n_noise > 0:
eps_val += 0.1
if i % 20 == 0:
eps_val = 0.3
min_samples_val += 1
n_clusters, n_noise = dbscan(data, X, eps_val, min_samples_val)
if i > 1000:
break
i += 1
print("eps =", eps_val, "| min_samples =", min_samples_val, "| n_clusters =", n_clusters, "|
n_noise =", n_noise)
return eps_val, min_samples_val
spark.udf.register("dbscan",dbscan)
spark.udf.register("run_dbscan",run_dbscan)
dbscanudf = udf(dbscan, ArrayType())
rundbscanudf = udf(run_dbscan, DoubleType())
Any help would be highly appreciated