0
votes

I am converting a dataframe into a dataset using case class which has a sequence of another case class

case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
case class IpLocation(
    ip: String,
    ipVersion: Byte,
    ipType: String,
    city: String,
    state: String,
    country: String)

Now I have another dataset of strings that has just IPs. My requirement is to get all records from IpLocation if ipType == "home" or IP dataset has the given IP from ipLocation. I am trying to use bloom filter on the IP dataset to search through that dataset but it is inefficient and not working that well in general. I want to join the IP dataset with IpLocation but I'm having trouble since this is in a Seq. I'm very new to spark and scala so I'm probably missing something. Right now my code looks like this


def buildBloomFilter(Ips: Dataset[String]): BloomFilter[String] = {
    val count = Ips.count
    val bloomFilter = Ips.rdd
      .mapPartitions { iter =>
        val b = BloomFilter.optimallySized[String](count, FP_PROBABILITY)
        iter.foreach(i => b += i)
        Iterator(b)
      }
      .treeReduce(_|_)
    bloomFilter
  }

val ipBf = buildBloomFilter(Ips)
val ipBfBroadcast = spark.sparkContext.broadcast(ipBf)

idMonitor.map { x => 
    x.ipLocation.filter(
       x => x.ipType == "home" && ipBfBroadcast.value.contains(x.ip)
    )
}

I just want to figure out how to join IpLocation and Ips

2
At the end, do you want a dataset of IpLocation or a dataset of Seq[IpLocation] ?Vincent Doba
At the end I want Seq[IpLocation]Utkarsh Roy
I updated my answer to match your desired outputVincent Doba

2 Answers

0
votes

You can explode your array sequence in your IpMonitor objects using explode function and then use an inner join to filter out ips using your Ips dataset and finally rebuild your IpLocation sequence by grouping by id and collect_list.

Complete code is as follows:

import org.apache.spark.sql.functions.{col, collect_list, explode}

val result = idMonitor.select(col("id"), explode(col("ipLocation")))
  .filter(col("col.ipType") === "home")
  .join(Ips, col("col.ip") === col("value"))
  .groupBy("id")
  .agg(collect_list("col").as("value"))
  .drop("id")
  .as[Seq[IpLocation]]
0
votes

Sample:

Starting from your case class,

case class IpLocation(
    ip: String,
    ipVersion: Byte,
    ipType: String,
    city: String,
    state: String,
    country: String
)
case class IdMonitor(id: String, ipLocation: Seq[IpLocation])

I have defined the sample data as follows:

val ip_locations1 = Seq(IpLocation("123.123.123.123", 12.toByte, "home", "test", "test", "test"), IpLocation("123.123.123.124", 12.toByte, "otherwise", "test", "test", "test"))
val ip_locations2 = Seq(IpLocation("123.123.123.125", 13.toByte, "company", "test", "test", "test"), IpLocation("123.123.123.124", 13.toByte, "otherwise", "test", "test", "test"))

val id_monitor = Seq(IdMonitor("1", ip_locations1), IdMonitor("2", ip_locations2))
val df = id_monitor.toDF()
df.show(false)

+---+------------------------------------------------------------------------------------------------------+
|id |ipLocation                                                                                            |
+---+------------------------------------------------------------------------------------------------------+
|1  |[{123.123.123.123, 12, home, test, test, test}, {123.123.123.124, 12, otherwise, test, test, test}]   |
|2  |[{123.123.123.125, 13, company, test, test, test}, {123.123.123.124, 13, otherwise, test, test, test}]|
+---+------------------------------------------------------------------------------------------------------+

and the IPs:

val ips = Seq("123.123.123.125")
val df_ips = ips.toDF("ips")
df_ips.show()

+---------------+
|            ips|
+---------------+
|123.123.123.125|
+---------------+

Join:

From the above example data, explode the array of the IdMonitor and join with the IPs.

df.withColumn("ipLocation", explode('ipLocation)).alias("a")
  .join(df_ips.alias("b"), col("a.ipLocation.ipType") === lit("home") || col("a.ipLocation.ip") === col("b.ips"), "inner")
  .select("ipLocation.*")
  .as[IpLocation].collect()

Finally, the collected result is given as follows:

res32: Array[IpLocation] = Array(IpLocation(123.123.123.123,12,home,test,test,test), IpLocation(123.123.123.125,13,company,test,test,test))