0
votes

I Have 2 large data frames. Each row has lat/lon data. My goal is to do a join between 2 dataframes and find all the points which are within a distance, e.g. 100m.

df1: (id, lat, lon, geohash7)
df2: (id, lat, lon, geohash7)

I want to partition df1 and df2 on geohash7, and then only join within the partitions. I want to avoid joining between partitions to reduce computation.

df1 = df1.repartition(200, "geohash7")
df2 = df2.repartition(200, "geohash7")

df_merged = df1.join(df2, (df1("geohash7")===df2("geohash7")) & (dist(df1("lat"),df1("lon"),df2("lat"),df2("lon"))<100) )

So basically join on geohash7 and then make sure distance between points is less than 100. The problem is that, Spark actually will cross join all the data. How can I make it only do inter-partition join not intra-partition join?

1
Have you checked the plan by .explain()? I don't understand the code will do the cross join. - Lamanus
what is dist doing? I am correct that it is an udf calculating the euclidian distance between the two points? This would explain the cross join - werner
dist calculates the haversine distance between two points. Yeah it seems that what I want is not supported by spark. After some playing with it, I think spark is not doing a cross join because geohash-7 condition is in join statement. If I remove the geohash-7 matching condition, the query runs much slower. - solora

1 Answers

0
votes

After much playing with data, it seems that spark is smart enough to first make sure a join happens on the equality condition ("geohash7"). So if there's no match there, it won't calculate the "dist" function. It also appears that with equality condition, it doesn't do cross-join anymore. So I didn't have to do anything else. The join above works fine.