1
votes

There's already a similar question here, but it is using Maven, and I'm using sbt. Moreover none of the solutions there worked for me

I'm using Spark 2.4.0, Scala 2.11.12 and IntelliJ IDEA 2019.1

My build.sbt looks like:

libraryDependencies ++= Seq(
    "com.groupon.sparklint" %% "sparklint-spark212" % "1.0.12" excludeAll ExclusionRule(organization = "org.apache.spark"),
    "org.apache.spark" %% "spark-core" % "2.4.0",
    "org.apache.spark" %% "spark-sql" % "2.4.0",
    "org.apache.spark" %% "spark-streaming" % "2.4.0",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
    "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0",
    "com.typesafe.slick" %% "slick" % "3.3.0",
    "org.slf4j" % "slf4j-nop" % "1.6.4",
    "com.typesafe.slick" %% "slick-hikaricp" % "3.3.0",
    "com.typesafe.slick" %% "slick-extensions" % "3.0.0"
)

Edit all over:

I will be receiving a stream of data from Kafka, which will be sent to the Spark Streaming context using:

val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

From this, I want to create a stream of RawWeatherData objects. A sample output from the stream would look like:

(null,725030:14732,2008,12,31,11,0.6,-6.7,1001.7,80,6.2,8,0.0,0.0)

Things look all good, except that I need to remove the first null value to create the stream of RawWeatherData objects as the constructor cannot accept the first null value, but can accept all other values from the stream.

Just for clarity sakes, here's what RawWeatherData looks like (I cannot edit this):

case class RawWeatherData(
                           wsid: String,
                           year: Int,
                           month: Int,
                           day: Int,
                           hour: Int,
                           temperature: Double,
                           dewpoint: Double,
                           pressure: Double,
                           windDirection: Int,
                           windSpeed: Double,
                           skyCondition: Int,
                           skyConditionText: String,
                           oneHourPrecip: Double,
                           sixHourPrecip: Double) extends WeatherModel

To achieve that purpose, I send my stream into a function, which returns me the desired stream of RawWeatherData objects:

def ingestStream(rawWeatherStream: InputDStream[(String, String)]): DStream[RawWeatherData] = {
    rawWeatherStream.map(_._2.split(",")).map(RawWeatherData(_))
}

Now I am looking to insert this stream into a MySQL/DB2 database. From this RawWeatherData object (725030:14732,2008,12,31,11,0.6,-6.7,1001.7,80,6.2,8,0.0,0.0), the left highlighted bold part is the primary key, and the right bold part is the value that has to be reduced/aggregated.

So essentially I want my DStream to have key-value pairs of ([725030:14732,2008,12,31] , <summed up values for the key>)

So after ingestStream, I try to perform this:

parsedWeatherStream.map { weather =>
        (weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
    }.saveToCassandra(CassandraKeyspace, CassandraTableDailyPrecip)

After the end of map, I try to write .reduceByKey(), but when I try that, the error says Cannot resolve symbolreduceByKey`. I'm not sure why this is happening as the function is available in the spark documentation.

PS. Right now weather.oneHourPrecip is set to counter in cassandra, so cassandra will automatically aggregate the value for me. But this will not be possible in other databases like DB2, hence I wanted an apt replacement, like reduceByKey in spark. Is there any way to proceed with such a case?

1
After the map operation, you have an RDD[RawWeatherData] which is not a pair (key-value) RDD. reduceByKey is only available for PairRDDs. - sachav

1 Answers

0
votes

Type of your stream is DStream[RawWeatherData] and reduceByKey is available only on streams of type DStream[(K,V)], which is a stream of tuples consisting of key and value.

What you wanted to do is probably to use mapValues instead of map:

 val parsedWeatherStream: DStream[(String, RawWeatherData)] = rawWeatherStream
     .mapValues(_.split(","))
     .mapValues(RawWeatherData) 

As you can see by type of parsedWeatherStream from the snippet above, if you'd use mapValues, you'd not discard your keys and you could use reduceByKey.