The following code works
@throws(classOf[IKodaMLException])
def soMergeTarget1( oldTargetIdx: Double, newTargetIdx: Double): RDDLabeledPoint =
{
try
{
logger.trace("\n\n--sparseOperationRenameTargetsInNumeriOrder--\n\n")
val oldTargetIdxb=spark.sparkContext.broadcast(oldTargetIdx)
val newTargetIdxb=spark.sparkContext.broadcast(newTargetIdx)
val newdata:RDD[(LabeledPoint,Int,String)] = sparseData.map
{
r =>
val currentLabel: Double = r._1.label
currentLabel match
{
case x if x == oldTargetIdxb.value =>
val newtrgt=newTargetIdxb.value
(new LabeledPoint(newtrgt, r._1.features), r._2, r._3)
case _ => r
}
}
val newtargetmap=ilp.targetMap.filter(e=> !(e._2 == oldTargetIdx))
oldTargetIdxb.destroy
newTargetIdxb.destroy
new RDDLabeledPoint(newdata,copyColumnMap,newtargetmap,ilp.name)
}
But, having destroyed the broadcast variables at the end of the method, the newtrgt
variable in the RDD is also destroyed.
The trouble is that once the RDD is returned from this method it could be used by any analyst in any code. So, I seem to have lost all control of the broadcast variables.
Questions:
If I don't destroy the variables, will spark destroy them when reference to the RDD disappears?
(Perhaps a naive question but....) I tried a little hack val newtrgt=oldTargetIdxb.value + 1 -1
thinking that might create a new reference that is distinct from the broadcast variable. It didn't work. I must admit that surprised me. Can someone explain why the hack didn't work (I'm not suggesting it was a good idea, but I am curious).