1
votes

I have a file with some records.

1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99

I created a paired RDD like below.

val orderItemsRDD = sc.textFile("/shiva/spark/input/data/retail_db/order_items")

val orderItemsPairedRDD = orderItemsRDD.map(order => {val x = order.split(","); (x(1).toInt,x(4).toFloat)})

the output paired RDD is with type RDD[(Int, Float)] like below.

orderItemsPairedRDD: org.apache.spark.rdd.RDD[(Int, Float)] = MapPartitionsRDD[120] at map at <console>:26

Now I am using reduceByKey to get sum.

val orderItemsRBKRDD = orderItemsPairedRDD.reduceByKey((total, rev) => total + rev)

But surprisingly the output is of type RDD[(Int, String)]

orderItemsRBKRDD: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[121] at reduceByKey at <console>:34

If I check the output data it is looking like.

scala> orderItemsRBKRDD.take(10).foreach(println)
(41234,102921,41234,249,2,109.94,54.97)
(65722,164249,65722,365,2,119.98,59.99164250,65722,730,5,400.0,80.0164251,65722,1004,1,399.98,399.98164252,65722,627,5,199.95,39.99164253,65722,191,2,199.98,99.99)
(28730,71921,28730,365,5,299.95,59.9971922,28730,502,1,50.0,50.0)
(68522,171323,68522,127,1,329.99,329.99)
(23776,59498,23776,1073,1,199.99,199.9959499,23776,403,1,129.99,129.99)
(32676,81749,32676,365,1,59.99,59.9981750,32676,627,4,159.96,39.9981751,32676,191,3,299.97,99.9981752,32676,1073,1,199.99,199.99)
(53926,134834,53926,365,2,119.98,59.99134835,53926,191,1,99.99,99.99)
(4926,12324,4926,1014,4,199.92,49.9812325,4926,1073,1,199.99,199.9912326,4926,365,4,239.96,59.9912327,4926,957,1,299.98,299.98)
(38926,97183,38926,191,5,499.95,99.9997184,38926,502,5,250.0,50.097185,38926,365,5,299.95,59.99)
(29270,73214,29270,365,5,299.95,59.9973215,29270,365,2,119.98,59.9973216,29270,1004,1,399.98,399.9873217,29270,627,4,159.96,39.9973218,29270,1004,1,399.98,399.98)

I changed my code like below.

scala> val orderItemsRBKRDD = orderItemsPairedRDD.reduceByKey((total, revenue) => total + revenue)

now the output is like below and which is as expected (proper).

orderItemsRBKRDD: org.apache.spark.rdd.RDD[(Int, Float)] = ShuffledRDD[123] at reduceByKey at <console>:28

and the data is

scala> orderItemsRBKRDD.take(10).foreach(println)
(41234,109.94)
(65722,1319.8899)
(28730,349.95)
(68522,329.99)
(23776,329.98)
(32676,719.91003)
(53926,219.97)
(4926,939.85)
(38926,1049.9)
(29270,1379.8501)

the only difference or change is the variable name is changed from "rev" to "revenue".

Is "rev" reserved word in spark or scala?

1
rev is not a reserved word. The issue is somewhere else in the code I suppose.Balaji Reddy

1 Answers

2
votes

As Balaji told, variable "rev" is not a reserved keyword in spark or scala. Maybe you can create a ticket in Jira.