0
votes

I have two DataFrames called DF1 and DF2, the content of each DataFrame is as follows:

df1:

line_item_usage_account_id  line_item_unblended_cost    name 
100000000001                12.05                       account1
200000000001                52                          account2
300000000003                12.03                       account3

df2:

accountname     accountproviderid   clustername     app_pmo     app_costcenter
account1        100000000001        cluster1        111111      11111111
account2        200000000001        cluster2        222222      22222222

I need to make a join for fields df1.line_item_usage_account_id and df2.accountproviderid

When both fields have the same ID, the value of the DF1 line_item_unblended_cost column must be added. And when the value of the line_item_usage_account_id field of the DF1 is not in the accountproviderid column of the DF2, the df1 fields must be aggregated as follows:

accountname     accountproviderid   clustername     app_pmo     app_costcenter      line_item_unblended_cost
account1        100000000001        cluster1        111111      11111111            12.05
account2        200000000001        cluster2        222222      22222222            52
account3        300000000003        NA              NA          NA                  12.03

The account3 data was added at the end of the new DataFrame by filling with "na" the columns of the DF2.

Any help thanks in advance.

1

1 Answers

1
votes
from pyspark.sql import SparkSession   
spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame([
    [100000000001, 12.05, 'account1'], 
    [200000000001, 52.00, 'account2'], 
    [300000000003, 12.03, 'account3']], 
    schema=['line_item_usage_account_id',  'line_item_unblended_cost', 'name' ])

df1.show()
df1.printSchema()

df2 = spark.createDataFrame([
    ['account1', 100000000001, 'cluster1', 111111, 11111111],
    ['account2', 200000000001, 'cluster2', 222222, 22222222]], 
    schema=['accountname', 'accountproviderid', 'clustername', 'app_pmo', 'app_costcenter'])

df2.printSchema()
df2.show()

cols = ['name', 'line_item_usage_account_id', 'clustername', 'app_pmo', 'app_costcenter', 'line_item_unblended_cost']
resDF = df1.join(df2, df1.line_item_usage_account_id == df2.accountproviderid, "leftouter").select(*cols).withColumnRenamed('name', 'accountname').withColumnRenamed('line_item_usage_account_id', 'accountproviderid').orderBy('accountname')

resDF.printSchema()
 # |-- accountname: string (nullable = true)
 # |-- accountproviderid: long (nullable = true)
 # |-- clustername: string (nullable = true)
 # |-- app_pmo: long (nullable = true)
 # |-- app_costcenter: long (nullable = true)
#  |-- line_item_unblended_cost: double (nullable = true)

resDF.show()
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# |accountname|accountproviderid|clustername|app_pmo|app_costcenter|line_item_unblended_cost|
# +-----------+-----------------+-----------+-------+--------------+------------------------+
# |   account1|     100000000001|   cluster1| 111111|      11111111|                   12.05|
# |   account2|     200000000001|   cluster2| 222222|      22222222|                    52.0|
# |   account3|     300000000003|       null|   null|          null|                   12.03|
# +-----------+-----------------+-----------+-------+--------------+------------------------+