3
votes

I have the following map:

 val pairs = lines.map( l => ( if (l.split(",")(1).toInt < 60) { "rest" } else if (l.split(",")(1).toInt > 110) { "sport" }, 10) ).reduceByKeyAndWindow((a:Int, b:Int) => (a+b), Seconds(12))

Basically, when someone's HR is bellow 60, it's classified as rest, above 110 is classified as sport. The second variable of the tuple represents that the person has been doing it for 10 minutes.

Rigth now, this maps an empty key for values between 60 and 110. What I want is to completely discard them. How is that achievable?

So from

("rest", 30)
("sport", 120)
((),10)

I'm trying to filter out ((),10). I've tried

 pairs.filter{case (key, value) => key.length < 3} //error: value length is not a member of Any
 pairs.filter(_._1 != "")  //no error, just still keeps the empty keys, too   

None seem to work.

2
Flatmap is your friend.maasg
Would you be so kind to elaborate? :)lte__
Sorry, im on the phone. Search for spark Scala flatmap here. This has been discussed before.maasg

2 Answers

1
votes

Your problem is that your if expression returns either String in case of match of Unit in case of miss. You can fix your filter easily:

val pairs = lines.map(
  l => (if (l.split(",")(1).toInt < 60) {"rest"} else if (l.split(",")(1).toInt > 110) {"sport"}, 10))
    .filter(_._1 != ())

() in scala is identity of type Unit.

But this is not the right way, really. You still get tuples of (Unit, Int) as the result. You're losing type with this if statement.

The correct way is either to filter your data before and have exhaustive if:

val pairs =
  lines.map(_.split(",")(1).toInt)
    .filter(hr => hr < 60 || hr > 110)
    .map(hr => (if (hr < 60) "rest" else "sport", 10))

Or to use collect, which in spark is the shortcut for .filter.map:

val pairs =
  lines.map(_.split(",")(1).toInt)
    .collect{
      case hr if hr < 60 => "rest" -> 10
      case hr if hr > 110 => "sport" -> 10
    }

Probably this variant is more readable.

Also, please note how I moved split into separate step. This is done to avoid calling split second time for second if branch.

UPD. Another approach is to use flatMap, as suggested in comments:

val pairs =
  lines.flatMap(_.split(",")(1).toInt match{
      case hr if hr < 60 => Some("rest" -> 10)
      case hr if hr > 110 => Some("sport" -> 10)
      case _ => None
    })

It may or may not be more efficient, as it avoids filter step, but adds wrapping and unwrapping elements in Option. You can test performance of different approaches and tell us the results.

0
votes

Note: Not a direct answer to this question. But it might be useful for users who uses dataframes

In Dataframe, we can use drop function to drop the rows which does not contain values for specified columns

In this case, you can use sc.parallelize and toDF to construct dataframe. And then you can just use df.drop()