1
votes

I have two tables in Hive -

 emp(empid int,empname string,deptid string)
 dept(deptid string, deptname string)

Sample Data

Emp table in Hive has the schema empid int,empname string,deptid string

 1,Monami Sen,D01
 2,Tarun Sen,D02
 3,Shovik Sen,D03
 4, Rita Roy,D02
 5,Farhan,D01

Dept table in Hive has the schema deptid string,deptname string

 D01,Finance
 D02,IT
 D03,Accounts
 D04,Admin

I need to create another hive table which should have the following schema -

dept id string, dept name string, emp_details array<struct<emp_id:string,emp_name string>>

The array of struct attribute should contain all employee details - empid and empname belonging to a particular department and final dataframe should be converted to JSON format.

Desired output :

{"deptid":"D01","deptname":"IT","empdetails":[{"empid":1,"empname":"Monami Sen"}]}
{"deptid":"D02","deptname":"Accounts","empdetails":[{"empid":2,"empname":"Rita Roy"}, 
{"empid":5,"empname":"Rijul Shah"}]}
{"deptid":"D03","deptname":"Finance","empdetails":[{"empid":3,"empname":"Shovik Sen"},{"empid":4,"empname":"Arghya Ghosh"}]}
{"deptid":"D04","deptname":"Adminstration","empdetails":[]}

I need to use Spark version 1.6 and Scala 2.10 for coding. The datasets are huge hence would require efficient code handling for best performance.

Can you please help me with any suggestions for the code?

1

1 Answers

0
votes

I would suggest performing a left_outer join, followed by a groupBy/collect_list aggregation and a toJSON transformation, as shown below:

val empDF = Seq(
  (1, "Monami Sen", "D01"),
  (2, "Tarun Sen", "D02"),
  (3, "Shovik Sen", "D03"),
  (4, "Rita Roy", "D02"),
  (5, "Farhan", "D01")
).toDF("empid", "empname", "deptid")

val deptDF = Seq(
  ("D01", "Finance"),
  ("D02", "IT"),
  ("D03", "Accounts"),
  ("D04", "Admin")
).toDF("deptid", "deptname")

deptDF.join(empDF, Seq("deptid"), "left_outer").
  groupBy("deptid", "deptname").
  agg(collect_list(struct($"empid", $"empname")).as("empdetails")).
  toJSON.
  show(false)
// +----------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                 |
// +----------------------------------------------------------------------------------------------------------------------+
// |{"deptid":"D03","deptname":"Accounts","empdetails":[{"empid":3,"empname":"Shovik Sen"}]}                              |
// |{"deptid":"D02","deptname":"IT","empdetails":[{"empid":4,"empname":"Rita Roy"},{"empid":2,"empname":"Tarun Sen"}]}    |
// |{"deptid":"D01","deptname":"Finance","empdetails":[{"empid":5,"empname":"Farhan"},{"empid":1,"empname":"Monami Sen"}]}|
// |{"deptid":"D04","deptname":"Admin","empdetails":[{}]}                                                                 |
// +----------------------------------------------------------------------------------------------------------------------+

For Spark 1.6, consider aggregating via Spark SQL (as collect_list doesn't seem to support non-primitive field types in Spark DataFrame API):

deptDF.join(empDF, Seq("deptid"), "left_outer").
  createOrReplaceTempView("joined_table")

val resultDF = sqlContext.sql("""
  select deptid, deptname, collect_list(struct(empid, empname)) as empdetails
  from joined_table
  group by deptid, deptname
""")

resultDF.toJSON.
  show(false)