2
votes

I have two dataframe df1 and ip2Country. df1 contains the IP addresses and I am trying to map the ip addresses into geolocation information like longitude and latitude which are columns in ip2Country.

I am running it as a Spark-submit job, but the operations took a very long time even though df1 only has less than 2500 lines.

My code:

val agg =df1.join(ip2Country, ip2Country("network_start_int")=df1("sint")
, "inner")
.select($"src_ip"
,$"country_name".alias("scountry")
,$"iso_3".alias("scode")
,$"longitude".alias("slong")
,$"latitude".alias("slat")
,$"dst_ip",$"dint",$"count")
.filter($"slong".isNotNull)

val agg1 =agg.join(ip2Country, ip2Country("network_start_int")=agg("dint")
, "inner")
.select($"src_ip",$"scountry"
,$"scode",$"slong"
,$"slat",$"dst_ip"
,$"country_name".alias("dcountry")
,$"iso_3".alias("dcode")
,$"longitude".alias("dlong")
,$"latitude".alias("dlat"),$"count")
.filter($"dlong".isNotNull)

Is there any other way to join the two table? Or am I doing it the wrong way?

1
which one is taking more time agg or agg1? - Ram Ghadiyaram
actually both are taking a long time. When I printed the sys.time, then agg1 is taking a longer time - ELI
Largedf.join(broadcast(smalldf)) will work where broadcast is hint to framework - Ram Ghadiyaram
so is like ip2Country.join(broadcast(df1),...) ? - ELI
yes see my answer here it will clearly explain why it works better. more details were explained in nice way. it you like that please vote-up. Thanks! - Ram Ghadiyaram

1 Answers

10
votes

If you have a big dataframe which needs to be joined with a small one - Broadcast joins are very effective. Read here: Broadcast Joins (aka Map-Side Joins)

bigdf.join(broadcast(smalldf))