0
votes

I am trying to do one simple java spark application which does the following

Input Data csv format : key1,key2,data1,data2

Basically what I am trying to do here is,

First I am mapping each line by key1 and then doing a groupByKey operation on that rdd.

JavaRDD<String> viewRdd = sc.textFile("testfile.csv", 1);
JavaPairRDD<String, String> customerIdToRecordRDD = viewRdd
    .mapToPair(w -> new Tuple2<String, String>(w.split(",")[0], w));
JavaPairRDD<String, Iterable<String>> groupedByKey1RDD = customerIdToRecordRDD.groupByKey();
System.out.println(customerIdToRecordGropedRDD.count());

Now my problem is, I need to do an aggregateByKey with key2 on each group from groupedByKey1RDD. Is there any way to convert Iterable into an RDD ?? or am I missing something here. I am new to this, any help will be

Example input and expected output :

id_1,time0,10,10

id_2,time1,0,10

id_1,time1,11,10

id_1,time0,1,10

id_2,time1,10,10

Output is grouped by 1st column and then aggregated by 2nd column (aggregate logic is to simply add column3 and column4):

id_1 : time0 : { sum1 : 11, sum2 : 20} ,
       time1 : { sum1 : 11, sum2 : 10}

id_2 : time1 : { sum1 : 10, sum2 : 20} 
1
Can you please provide sample of csv data and expected output?abaghel
@abaghel Added sample input and outputRijo Joseph
What is the spark version you are using? And do you want to use RDD, because this can be easily solved and will be more manageable using Dataframe.abaghel

1 Answers

2
votes

Here is the solution using Spark 2.0 and Dataframe. Please let me know if you still want to use RDD.

public class SparkGroupBySample {
    public static void main(String[] args) {
    //SparkSession
    SparkSession spark = SparkSession
            .builder()
            .appName("SparkGroupBySample")
            .master("local")
            .getOrCreate();     
    //Schema
    StructType schema = new StructType(new StructField[] { 
            new StructField("key1", DataTypes.StringType, true, Metadata.empty()),
            new StructField("key2", DataTypes.StringType, true, Metadata.empty()),
            new StructField("data1", DataTypes.IntegerType, true, Metadata.empty()),
            new StructField("data2", DataTypes.IntegerType, true, Metadata.empty())});
    //Read csv
    Dataset<Row> dataSet = spark.read().format("csv").schema(schema).option("header", "true").option("delimiter", ",").load("c:\\temp\\sample.csv");
    dataSet.show();     
    //groupBy and aggregate
    Dataset<Row> dataSet1 = dataSet.groupBy("key1","key2").sum("data1","data2").toDF("key1","key2","sum1","sum2");
    dataSet1.show();
    //stop
    spark.stop();
   }
}

Here is the output.

+----+-----+----+----+
|key1| key2|sum1|sum2|
+----+-----+----+----+
|id_1|time1|  11|  10|
|id_2|time1|  10|  20|
|id_1|time0|  11|  20|
+----+-----+----+----+