There's already a similar question here, but it is using Maven, and I'm using sbt. Moreover none of the solutions there worked for me
I'm using Spark 2.4.0, Scala 2.11.12 and IntelliJ IDEA 2019.1
My build.sbt looks like:
libraryDependencies ++= Seq(
"com.groupon.sparklint" %% "sparklint-spark212" % "1.0.12" excludeAll ExclusionRule(organization = "org.apache.spark"),
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-sql" % "2.4.0",
"org.apache.spark" %% "spark-streaming" % "2.4.0",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
"com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0",
"com.typesafe.slick" %% "slick" % "3.3.0",
"org.slf4j" % "slf4j-nop" % "1.6.4",
"com.typesafe.slick" %% "slick-hikaricp" % "3.3.0",
"com.typesafe.slick" %% "slick-extensions" % "3.0.0"
)
Edit all over:
I will be receiving a stream of data from Kafka, which will be sent to the Spark Streaming context using:
val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
From this, I want to create a stream of RawWeatherData objects. A sample output from the stream would look like:
(null,725030:14732,2008,12,31,11,0.6,-6.7,1001.7,80,6.2,8,0.0,0.0)
Things look all good, except that I need to remove the first null value to create the stream of RawWeatherData objects as the constructor cannot accept the first null value, but can accept all other values from the stream.
Just for clarity sakes, here's what RawWeatherData looks like (I cannot edit this):
case class RawWeatherData(
wsid: String,
year: Int,
month: Int,
day: Int,
hour: Int,
temperature: Double,
dewpoint: Double,
pressure: Double,
windDirection: Int,
windSpeed: Double,
skyCondition: Int,
skyConditionText: String,
oneHourPrecip: Double,
sixHourPrecip: Double) extends WeatherModel
To achieve that purpose, I send my stream into a function, which returns me the desired stream of RawWeatherData objects:
def ingestStream(rawWeatherStream: InputDStream[(String, String)]): DStream[RawWeatherData] = {
rawWeatherStream.map(_._2.split(",")).map(RawWeatherData(_))
}
Now I am looking to insert this stream into a MySQL/DB2 database. From this RawWeatherData object (725030:14732,2008,12,31,11,0.6,-6.7,1001.7,80,6.2,8,0.0,0.0), the left highlighted bold part is the primary key, and the right bold part is the value that has to be reduced/aggregated.
So essentially I want my DStream to have key-value pairs of ([725030:14732,2008,12,31] , <summed up values for the key>)
So after ingestStream, I try to perform this:
parsedWeatherStream.map { weather =>
(weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
}.saveToCassandra(CassandraKeyspace, CassandraTableDailyPrecip)
After the end of map, I try to write .reduceByKey(), but when I try that, the error says Cannot resolve symbolreduceByKey`. I'm not sure why this is happening as the function is available in the spark documentation.
PS. Right now weather.oneHourPrecip is set to counter in cassandra, so cassandra will automatically aggregate the value for me. But this will not be possible in other databases like DB2, hence I wanted an apt replacement, like reduceByKey in spark. Is there any way to proceed with such a case?
mapoperation, you have anRDD[RawWeatherData]which is not a pair (key-value) RDD.reduceByKeyis only available forPairRDDs. - sachav