0
votes

I use sklearn.DBSCAN in my pyspark job. See the code snippet below. I also zipped all the dependency modules in deps.zip file which is added to the SparkContext.

from sklearn.cluster import DBSCAN
import numpy as np
#import pyspark
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import DoubleType
from pyspark.sql import Row

def dbscan_latlng(lat_lngs,mim_distance_km,min_points=10):

coords = np.asmatrix(lat_lngs)
kms_per_radian = 6371.0088
epsilon = mim_distance_km/ kms_per_radian
db = DBSCAN(eps=epsilon, min_samples=min_points, algorithm='ball_tree', metric='haversine').fit(np.radians(coords))
cluster_labels = db.labels_
num_clusters = len(set(cluster_labels))
clusters = pd.Series([coords[cluster_labels == n] for n in range(num_clusters)])
maxClusters = clusters.map(len).max()
if (maxClusters > 3):
  dfClusters = clusters.to_frame('coords')
  dfClusters['length'] = dfClusters.apply(lambda x: len(x['coords']), axis=1)
  custCluster = dfClusters[dfClusters['length']==maxClusters].reset_index()
  return custCluster['coords'][0].tolist()

sc = SparkContext()
sc.addPyFile('/content/airflow/dags/deps.zip')
sqlContext = SQLContext(sc)

However, after I submit the job via spark-submit -master local[4] FindOutliers.py, I get the following python error saying sklearn/__check_build is not a directory. Can anyone help me with this? Thanks a lot!

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/.virtualenvs/jacob/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 166, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/root/.virtualenvs/jacob/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 55, in read_command command = serializer._read_with_length(file) File "/root/.virtualenvs/jacob/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length return self.loads(obj) File "/root/.virtualenvs/jacob/local/lib/python2.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 454, in loads return pickle.loads(obj) File "/tmp/pip-build-0qnWWw/scikit-learn/sklearn/init.py", line 133, in File "/tmp/pip-build-0qnWWw/scikit-learn/sklearn/check_build/__init.py", line 46, in File "/tmp/pip-build-0qnWWw/scikit-learn/sklearn/check_build/__init.py", line 26, in raise_build_error OSError: [Errno 20] Not a directory: '/tmp/spark-beb8777f-b7d5-40be-a72b-c16e10264a50/userFiles-3762d9c0-6674-467a-949b-33968420bae1/deps.zip/sklearn/__check_build'

1

1 Answers

0
votes

Try :

import pyspark as ps

sc = ps.SparkContext()
sc.addPyFile('/content/airflow/dags/deps.zip')
sqlContext = ps.SQLContext