I have a .csv file that I am trying to analyse using spark. The .csv file contains amongst others, a list of topics and their counts. The topics and their counts are separated by a ',' and all these topics+counts are in the same string separated by ';' like so
"topic_1,10;topic_2,12;topic_1,3"
As you can see, some topics are in the string multiple times.
I have an rdd containing key value pairs of some date and the topic strings [date, topicstring]
What I want to do is split the string at the ';' to get all the separate topics, then split those topics at the ',' and create a key value pair of the topic name and counts, which can be reduced by key. For the example above this would be
[date, ((topic_1, 13), (topic_2, 12))]
So I have been playing around in spark a lot as I am new to scala. What I tried to do is
val separateTopicsByDate = topicsByDate
.mapValues(_.split(";").map({case(str) => (str)}))
.mapValues({case(arr) => arr
.filter(str => str.split(",").length > 1)
.map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
})
The problem is that this returns an Array of tuples, which I cannot reduceByKey. When I split the string at ';' this returns an array. I tried mapping this to a tuple (as you can see from the map operation) but this does not work.
The complete code I used is
val rdd = sc.textFile("./data/segment/*.csv")
val topicsByDate = rdd
.filter(line => line.split("\t").length > 23)
.map({case(str) => (str.split("\t")(1), str.split("\t")(23))})
.reduceByKey(_ + _)
val separateTopicsByDate = topicsByDate
.mapValues(_.split(";").map({case(str) => (str)}))
.mapValues({case(arr) => arr
.filter(str => str.split(",").length > 1)
.map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
})
separateTopicsByDate.take(2)
This returns
res42: Array[(String, Array[(String, Int)])] = Array((20150219001500,Array((Cecilia Pedraza,91), (Mexico City,110), (Soviet Union,1019), (Dutch Warmbloods,1236), (Jose Luis Vaquero,1413), (National Equestrian Club,1636), (Lenin Park,1776), (Royal Dutch Sport Horse,2075), (North American,2104), (Western Hemisphere,2246), (Maydet Vega,2800), (Mirach Capital Group,110), (Subrata Roy,403), (New York,820), (Saransh Sharma,945), (Federal Bureau,1440), (San Francisco,1482), (Gregory Wuthrich,1635), (San Francisco,1652), (Dan Levine,2309), (Emily Flitter,2327), (K...
As you can see this is an array of tuples which I cannot use .reduceByKey(_ + _) on.
Is there a way to split the string in such a way that it can be reduced by key?