I'm trying to flatten data in an RDD. The RDD is structured as a list of 4-tuples with the first element - primary_id , the second element - a list of dictionaries, third and fourth elements each contain a single list containing a dictionary.
rdd= [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
{'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
[{'pol_cat_id':'234','pol_dt':'20100220'}],
[{'qor_pol_id':'23492','qor_cd':'30'}]),
('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
{'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
[{'pol_cat_id':'532','pol_dt':'20091020'}],
[{'qor_pol_id':'49320','qor_cd':'21'}]) ]
I want to flatten the data so that it appears in the format
how would I do this in Pyspark?
Here is what I have attempted but this gives me an error: Too many tuples to unpack
def flatten_map(record):
try:
yield(record)
# Unpack items
id, items, line, pls = record
pol_id = pls["pol_cat_id"]
pol_dt = pls["pol_dt"]
qor_id = pls["qor_pol_id"]
for item in items:
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
except Exception as e:
pass
result = (rdd
# Expand data
.flatMap(flatten_map)
# Flatten tuples
.map(lambda x: x[0], ))
I can post the complete error if required but for sake of brevity,
ValueError: too many values to unpack (expected 2)
Note: converting to pandas doesn't work as RDD is too big