I am converting a dataframe into a dataset using case class which has a sequence of another case class
case class IdMonitor(id: String, ipLocation: Seq[IpLocation])
case class IpLocation(
ip: String,
ipVersion: Byte,
ipType: String,
city: String,
state: String,
country: String)
Now I have another dataset of strings that has just IPs. My requirement is to get all records from IpLocation
if ipType
== "home" or IP dataset has the given IP from ipLocation
. I am trying to use bloom filter on the IP dataset to search through that dataset but it is inefficient and not working that well in general. I want to join the IP dataset with IpLocation
but I'm having trouble since this is in a Seq. I'm very new to spark and scala so I'm probably missing something. Right now my code looks like this
def buildBloomFilter(Ips: Dataset[String]): BloomFilter[String] = {
val count = Ips.count
val bloomFilter = Ips.rdd
.mapPartitions { iter =>
val b = BloomFilter.optimallySized[String](count, FP_PROBABILITY)
iter.foreach(i => b += i)
Iterator(b)
}
.treeReduce(_|_)
bloomFilter
}
val ipBf = buildBloomFilter(Ips)
val ipBfBroadcast = spark.sparkContext.broadcast(ipBf)
idMonitor.map { x =>
x.ipLocation.filter(
x => x.ipType == "home" && ipBfBroadcast.value.contains(x.ip)
)
}
I just want to figure out how to join IpLocation
and Ips
IpLocation
or a dataset ofSeq[IpLocation]
? – Vincent DobaSeq[IpLocation]
– Utkarsh Roy