0
votes

I have a .csv file that I am trying to analyse using spark. The .csv file contains amongst others, a list of topics and their counts. The topics and their counts are separated by a ',' and all these topics+counts are in the same string separated by ';' like so

"topic_1,10;topic_2,12;topic_1,3"

As you can see, some topics are in the string multiple times.

I have an rdd containing key value pairs of some date and the topic strings [date, topicstring]

What I want to do is split the string at the ';' to get all the separate topics, then split those topics at the ',' and create a key value pair of the topic name and counts, which can be reduced by key. For the example above this would be

[date, ((topic_1, 13), (topic_2, 12))]

So I have been playing around in spark a lot as I am new to scala. What I tried to do is

val separateTopicsByDate = topicsByDate
  .mapValues(_.split(";").map({case(str) => (str)}))
  .mapValues({case(arr) => arr
    .filter(str => str.split(",").length > 1)
    .map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
  })

The problem is that this returns an Array of tuples, which I cannot reduceByKey. When I split the string at ';' this returns an array. I tried mapping this to a tuple (as you can see from the map operation) but this does not work.

The complete code I used is

val rdd = sc.textFile("./data/segment/*.csv")

val topicsByDate = rdd
  .filter(line => line.split("\t").length > 23)
  .map({case(str) => (str.split("\t")(1), str.split("\t")(23))})
  .reduceByKey(_ + _)

val separateTopicsByDate = topicsByDate
  .mapValues(_.split(";").map({case(str) => (str)}))
  .mapValues({case(arr) => arr
    .filter(str => str.split(",").length > 1)
    .map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
  })

separateTopicsByDate.take(2)

This returns

res42: Array[(String, Array[(String, Int)])] = Array((20150219001500,Array((Cecilia Pedraza,91), (Mexico City,110), (Soviet Union,1019), (Dutch Warmbloods,1236), (Jose Luis Vaquero,1413), (National Equestrian Club,1636), (Lenin Park,1776), (Royal Dutch Sport Horse,2075), (North American,2104), (Western Hemisphere,2246), (Maydet Vega,2800), (Mirach Capital Group,110), (Subrata Roy,403), (New York,820), (Saransh Sharma,945), (Federal Bureau,1440), (San Francisco,1482), (Gregory Wuthrich,1635), (San Francisco,1652), (Dan Levine,2309), (Emily Flitter,2327), (K...

As you can see this is an array of tuples which I cannot use .reduceByKey(_ + _) on.

Is there a way to split the string in such a way that it can be reduced by key?

1

1 Answers

0
votes

In case if your RDD has rows like:

[date, "topic1,10;topic2,12;topic1,3"]  

you can split the values and explode the row using flatMap into:

[date, ["topic1,10", "topic2,12", "topic1,3"]] ->

[date, "topic1,10"]  
[date, "topic2,12"]  
[date, "topic1,3"]

Then convert each row into [String,Integer] Tuple (rdd1 in the code):

["date_topic1",10]  
["date_topic2",12]  
["date_topic1",3]

and reduce by Key using addition (rdd2 in the code):

["date_topic1",13]  
["date_topic2",12]  

Then you separate dates from topics and combine topics with values, getting [String,String] Tuples like:

["date", "topic1,13"]  
["date", "topic2,12"]  

Finally you split the values into [topic,count] Tuples, prepare ["date", [(topic,count)]] pairs (rdd3 in the code) and reduce by Key (rdd4 in the code), getting:

["date", [(topic1, 13), (topic2, 12)]]

===
below is Java implementation as a sequence of four intermediate RDDs, you may refer to it for Scala development:

    JavaPairRDD<String, String> rdd;     //original data. contains [date, "topic1,10;topic2,12;topic1,3"] 

    JavaPairRDD<String, Integer> rdd1 =  //contains
                                         //["date_topic1",10]  
                                         //["date_topic2",12]  
                                         //["date_topic1",3]


            rdd.flatMapToPair(

                pair -> //pair=[date, "topic1,10;topic2,12;topic1,3"]
                {

                    List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();

                    String k = pair._1; //date
                    String v = pair._2; //"topic,count;topic,count;topic,count"

                    String[] v_splits = v.split(";");

                    for(int i=0; i<v_splits.length; i++)
                    {
                        String[] v_split_topic_count = v_splits[i].split(",");  //"topic,count"

                        list.add(new Tuple2<String,Integer>(k + "_" + v_split_topic_count[0], Integer.parseInt(v_split_topic_count[1]))); //"date_topic,count"
                    }

                    return list.iterator();
                }//end call

            );


    JavaPairRDD<String,Integer> rdd2 = //contains
                                       //["date_topic1",13]  
                                       //["date_topic2",12]  


           rdd1.reduceByKey((Integer i1, Integer i2) -> i1+i2);     


    JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd3 = //contains
                                                                //["date", [(topic1,13)]]  
                                                                //["date", [(topic2,12)]]  

           rdd2.mapToPair(

                pair -> //["date_topic1",13]
                {
                    String k  = pair._1; //date_topic1
                    Integer v = pair._2; //13


                    String[] dateTopicSplits = k.split("_");

                    String new_k = dateTopicSplits[0]; //date                    

                    List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
                    list.add(new Tuple2<String,Integer>(dateTopicSplits[1], v)); //[(topic1,13)]

                    return new Tuple2<String,Iterator<Tuple2<String,Integer>>>(new_k, list.iterator());
                }

           );

    JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd4 = //contains
                                                                //["date", [(topic1, 13), (topic2, 12)]]

            rdd3.reduceByKey(

            (Iterator<Tuple2<String,Integer>> itr1, Iterator<Tuple2<String,Integer>> itr2) ->
            {
               List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();

               while(itr1.hasNext())
                     list.add(itr1.next());

               while(itr2.hasNext())
                     list.add(itr2.next());

               return list.iterator();
            }

            );

UPD. This problem can actually be solved by using a single map only - you split the row value (i.e. topicstring) by ; so it gives you [key,value] pairs as [topic,count] and you populate the hashmap by those pairs adding up the counts. Finally you output the date key with all distinct keys accumulated in the hashmap together with their values.
This way seems to be more efficient as well because the size of hashmap is not going to be larger than the size of the original row so the memory space consumed by mapper will be limited by the size of largest row, whereas in the flatmap solution, memory should be large enough to fit all those expanded rows