0
votes

I have some data that is stored in CSV. Sample data is available here - https://github.com/PranayMehta/apache-spark/blob/master/data.csv

I read the data using pyspark

df = spark.read.csv("data.csv",header=True)
df.printSchema()
root
 |-- freeform_text: string (nullable = true)
 |-- entity_object: string (nullable = true)

>>> df.show(truncate=False)
+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|freeform_text                    |entity_object                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Grapes are good. Bananas are bad.|[{'name': 'Grapes', 'type': 'OTHER', 'salience': '0.8335162997245789', 'sentiment_score': '0.8999999761581421', 'sentiment_magnitude': '0.8999999761581421', 'metadata': {}, 'mentions': {'mention_text': 'Grapes', 'mention_type': 'COMMON'}}, {'name': 'Bananas', 'type': 'OTHER', 'salience': '0.16648370027542114', 'sentiment_score': '-0.8999999761581421', 'sentiment_magnitude': '0.8999999761581421', 'metadata': {}, 'mentions': {'mention_text': 'Bananas', 'mention_type': 'COMMON'}}]|
|the weather is not good today    |[{'name': 'weather', 'type': 'OTHER', 'salience': '1.0', 'sentiment_score': '-0.800000011920929', 'sentiment_magnitude': '0.800000011920929', 'metadata': {}, 'mentions': {'mention_text': 'weather', 'mention_type': 'COMMON'}}]                                                                                                                                                                                                                                                                 |
+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Now, I want to explode and parse the fields in the entity_object column in this dataframe. Here is some more know-how on what this column contains -

For every freeform_text stored in the Spark Dataframe, I have written some logic to parse out the entities using google's natural language API. These entities are stores as LIST of DICTIONARIES when I do the computation using pandas. I then convert them to string before storing them to Database.

This CSV is what I read in spark dataframe as 2 columns - freeform_text and entity_object.

The entity_object column as string is actually a LIST of dictionaries. It can be imagined as LIST[ DICT1, DICT2 ] and so on. So, some entity_object rows may have 1 element others may have more than 1 based on the number of entities in the output. For instance in the first row, there are 2 entities - grapes and bananas, whereas in 2nd row there is only entity weather.

I want to explode this entity_object column so that 1 record of freeform_text can be exploded in multiple records.

Here is a screenshot of how I would like my output to be -

enter image description here

1
have you tried to use from_json ?Steven

1 Answers

1
votes

This can be a working solution for you - Please do let me if this does not work -

Create the Dataframe here

df_new=spark.createDataFrame([
  {
    str({'name':'Grapes','type':'OTHER','salience':'0.8335162997245789','sentiment_score':'0.8999999761581421','sentiment_magnitude':'0.8999999761581421','metadata':{},'mentions':{'mention_text':'Grapes','mention_type':'COMMON'}}),
    str(
{'name':'weather','type':'OTHER','salience':'1.0','sentiment_score':'-0.800000011920929','sentiment_magnitude':'0.800000011920929','metadata':{},'mentions':{'mention_text':'weather','mention_type':'COMMON'}}
    )
  },
  {
    str(
{'name':'banana','type':'OTHER','salience':'1.0','sentiment_score':'-0.800000011920929','sentiment_magnitude':'0.800000011920929','metadata':{},'mentions':{'mention_text':'weather','mention_type':'COMMON'}}
    )
  }
],T.StringType())

Logic Here

df = df_new.withColumn('col', F.from_json("value", T.ArrayType(T.StringType())))
df = df.withColumn('explode_col', F.explode("col"))
df = df.withColumn('col', F.from_json("explode_col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("name", df.col.getItem("name")).withColumn("type", df.col.getItem("type")).withColumn("salience", df.col.getItem("salience")).withColumn("sentiment_score", df.col.getItem("sentiment_score")).withColumn("sentiment_magnitude", df.col.getItem("sentiment_magnitude")).withColumn("mentions", df.col.getItem("mentions"))
df.select("name", "type","salience","sentiment_score","sentiment_magnitude","mentions").show(truncate=False)

Output

+-------+-----+------------------+------------------+-------------------+--------------------------------------------------+
|name   |type |salience          |sentiment_score   |sentiment_magnitude|mentions                                          |
+-------+-----+------------------+------------------+-------------------+--------------------------------------------------+
|weather|OTHER|1.0               |-0.800000011920929|0.800000011920929  |{"mention_text":"weather","mention_type":"COMMON"}|
|Grapes |OTHER|0.8335162997245789|0.8999999761581421|0.8999999761581421 |{"mention_text":"Grapes","mention_type":"COMMON"} |
|banana |OTHER|1.0               |-0.800000011920929|0.800000011920929  |{"mention_text":"weather","mention_type":"COMMON"}|
+-------+-----+------------------+------------------+-------------------+--------------------------------------------------+

Update - Instead of createDataFrame - use spark.read.csv() as below

df_new = spark.read.csv("/FileStore/tables/data.csv", header=True)
df_new.show(truncate=False)
# Logic Here
df = df_new.withColumn('col', F.from_json("entity_object", T.ArrayType(T.StringType())))
df = df.withColumn('explode_col', F.explode("col"))
df = df.withColumn('col', F.from_json("explode_col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("name", df.col.getItem("name")).withColumn("type", df.col.getItem("type")).withColumn("salience", df.col.getItem("salience")).withColumn("sentiment_score", df.col.getItem("sentiment_score")).withColumn("sentiment_magnitude", df.col.getItem("sentiment_magnitude")).withColumn("mentions", df.col.getItem("mentions"))
df.select("freeform_text", "name", "type","salience","sentiment_score","sentiment_magnitude","mentions").show(truncate=False) 

+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

+---------------------------------+-------+-----+-------------------+-------------------+-------------------+--------------------------------------------------+
|freeform_text                    |name   |type |salience           |sentiment_score    |sentiment_magnitude|mentions                                          |
+---------------------------------+-------+-----+-------------------+-------------------+-------------------+--------------------------------------------------+
|Grapes are good. Bananas are bad.|Grapes |OTHER|0.8335162997245789 |0.8999999761581421 |0.8999999761581421 |{"mention_text":"Grapes","mention_type":"COMMON"} |
|Grapes are good. Bananas are bad.|Bananas|OTHER|0.16648370027542114|-0.8999999761581421|0.8999999761581421 |{"mention_text":"Bananas","mention_type":"COMMON"}|
|the weather is not good today    |weather|OTHER|1.0                |-0.800000011920929 |0.800000011920929  |{"mention_text":"weather","mention_type":"COMMON"}|
+---------------------------------+-------+-----+-------------------+-------------------+-------------------+--------------------------------------------------+