1
votes

I have a product information file with more than millions records. The CSV file looks like:

    Product    CategoryName    SalesUnit  Other Columns...
      p1           a12             41
p2 x5 72
p3 x5 69
p4 c21 80
p5 b16 59
p6 x5 75 .. .. ..
And I have a mapping file(CategoryCode <-> CategoryName) as following. The mapping file has around 200 records:
    CategoryCode CategoryName
1.0 a12
2.0 b13 3.0 b16 4.0 c12
5.0 c21
6.0 x5
.. ..
Eventually, I want to replace value of CategoryName with CategoryCode:
    Product    Category    SalesUnit   Other Colulmns..
     p1          1.0           41
p2 6.0 72
p3 6.0 69
p4 5.0 80
p5 3.0 59
p6 6.0 75 .. .. ..
My approach is to use the udf of spark dataframe:
    udf { (CategoryName: String) =>
        if (CategoryName.trim() == "a12") 1.0
        else if (CategoryName.trim() == "b13") 2.0
        else if (CategoryName.trim() == "b16") 3.0
        else if (CategoryName.trim() == "c12") 4.0
        else if (CategoryName.trim() == "c21") 5.0
        else if (CategoryName.trim() == "x5") 6.0
        else if (CategoryName.trim() == "z12") 7.0
        else if (...) ...
        ... ...
        else 999.0
    }
    
Any other elegant approach to achieve the replacement without by coding so many if...else clause? Thanks.
2

2 Answers

4
votes

Join the mapping file with the csv on trimmed category then select only the fields you need

3
votes

You can join both the dataFrame on the Categoryname and then drop the Categoryname itself as you are not needing it afterwards.

You can do something like this:

scala> //Can have more columns , have taken just these columns just to demonstrate

scala> val df1=sc.parallelize(Seq(("p1","a12",41),("p2","x5",72),("p3","x5",69))).toDF("Product","CategoryName","SalesUnit")
df1: org.apache.spark.sql.DataFrame = [Product: string, CategoryName: string ... 1 more field]

scala> //Category code dataFrame

scala>  val df2=sc.parallelize(Seq((1.0,"a12"),(4.0,"c12"),(5.0,"c21"),(6.0,"x5"))).toDF("CategoryCode","CategoryName")
df2: org.apache.spark.sql.DataFrame = [CategoryCode: double, CategoryName: string]

scala> val resultDF=df1.join(df2,"CategoryName").withColumnRenamed("CategoryCode","Category").drop("CategoryName")
resultDF: org.apache.spark.sql.DataFrame = [Product: string, SalesUnit: int ... 1 more field]

scala> resultDF.show()
+-------+---------+--------+                                                    
|Product|SalesUnit|Category|
+-------+---------+--------+
|     p1|       41|     1.0|
|     p2|       72|     6.0|
|     p3|       69|     6.0|
+-------+---------+--------+

P.S: This is just a small demonstration.