1
votes

Step 1: Reading entire data into a single column and convert into ArrayType

input_file = "../data/datafile.csv"
initial_df = sqlContext.read.format("csv").csv(input_file)
initial_df.show(n=100, truncate=False)

Output Result:

+------------------------------------------------------------------------+
|_c0                                                                     |
+------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |
+------------------------------------------------------------------------+

Step 2: Converting StringType to ArrayType.

inter_df = initial_df.withColumn("array", F.split(initial_df['_c0'], '\|'))
inter_df.show(n=100, truncate=False)

+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|_c0                                                                     |array                                                                         |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""               |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:""]                |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+

Step 3: Converting ArrayType to Dictionary Type so based on key am going to take the Respective key Values. Here am using UDF for converting ArrayType to MapType. For this conversion, it's taking a huge time. (Currently am running code with 300GB file, for processing its taking 3Hour time ) I want to reduce consuming time. Can anyone help me with this please*

def create_dict(input_string):
    result_list = {}
    for ele in input_string:
        internal_ele = ele.strip()
        internal_ele = internal_ele.split(":")
        internal_ele = [ele.strip() for ele in internal_ele]
        result_list[internal_ele[0]] = internal_ele[1].replace('"', "")
    return result_list

create_dict_udf = F.udf(create_dict, MapType(keyType=StringType(), valueType=StringType()))

inter_df = inter_df.withColumn("dictionary", create_dict_udf(F.col("array")))
inter_df.show(n=100, truncate=False)

+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|_c0                                                                     |array                                                                         |dictionary                                                                 |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""           |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:""]                |[MNB ->  , XYZ -> TableData, ABC -> MobileData, ZXC -> MacData]        |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |[MNB -> value4, XYZ -> value2, ABC -> value1, ZXC -> value3]               |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|[MNB -> valueD, XYZ -> ValueB, ABC -> valueA, POI -> valueE, ZXC -> valueC]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |[MNB -> value14, XYZ -> value12, ABC -> value11, ZXC -> value13]           |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |[XYZ -> value2A, ABC -> value1A, ZXC -> value3A]                           |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------------+

Without using UDF how can I convert ArrayType to MapType?

2
which version of spark you are usingstack0114106

2 Answers

1
votes

Step 3 can be done using transform and aggregate functions (for Spark 2.4+).

First, transform the array column created from step 2, each element can be converted from string to map type using the str_to_map function. Then, aggregate the result array to concatenate the map elements using map_concat.

df.withColumn("mapc",
              expr("""aggregate(transform(arr, x -> str_to_map(x)), 
                                map(), 
                                (acc, i) -> map_concat(acc, i)
                               )
                   """)) \
  .select("inp", "mapc") \
  .show(truncate=False)
1
votes

You can use the transform Higher Order Function (HOF) to convert the array to map. Try this.

df = spark.sql(""" with t1 (
 select  'ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""' c1 union all
 select 'ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"' c1 union all
 select 'ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"' c1 union all
 select 'ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"' c1 union all
 select 'ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"' 
  )  select   c1  inp    from t1
""")
df.show(50,truncate=False)

+------------------------------------------------------------------------+
|inp                                                                     |
+------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |
+------------------------------------------------------------------------+

df2 = df.withColumn("arr", split(col("inp"), "[|]"))
df2.show(50,truncate=False)


+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|inp                                                                     |arr                                                                           |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |[ABC:"MobileData", XYZ:"TableData", ZXC:"MacData", MNB:""]                    |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[ABC:"value1"    , XYZ:"value2"   , ZXC:"value3" , MNB:"value4"]              |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[ABC: "valueA"   , XYZ:"ValueB"   , ZXC:"valueC" , MNB:"valueD", POI:"valueE"]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[ABC:"value11"    , XYZ:"value12"   , ZXC:"value13" , MNB:"value14"]          |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[ABC:"value1A"    , XYZ:"value2A"   , ZXC:"value3A"]                          |
+------------------------------------------------------------------------+------------------------------------------------------------------------------+

df3=df2.withColumn("mapc", expr( """ transform(arr , (x,i)  -> map(split(x,":")[0],split(x,":")[1])  ) """)).select("inp","mapc")

df3.printSchema()

root
 |-- inp: string (nullable = false)
 |-- mapc: array (nullable = false)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

df3.show(50,truncate=False)

+------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|inp                                                                     |mapc                                                                                                   |
+------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|ABC:"MobileData"|XYZ:"TableData"|ZXC:"MacData"|MNB:""                   |[[ABC -> "MobileData"], [XYZ -> "TableData"], [ZXC -> "MacData"], [MNB -> ""]]                         |
|ABC:"value1"    |XYZ:"value2"   |ZXC:"value3" |MNB:"value4"             |[[ABC -> "value1"    ], [XYZ -> "value2"   ], [ZXC -> "value3" ], [MNB -> "value4"]]                   |
|ABC: "valueA"   |XYZ:"ValueB"   |ZXC:"valueC" |MNB:"valueD"|POI:"valueE"|[[ABC ->  "valueA"   ], [XYZ -> "ValueB"   ], [ZXC -> "valueC" ], [MNB -> "valueD"], [POI -> "valueE"]]|
|ABC:"value11"    |XYZ:"value12"   |ZXC:"value13" |MNB:"value14"         |[[ABC -> "value11"    ], [XYZ -> "value12"   ], [ZXC -> "value13" ], [MNB -> "value14"]]               |
|ABC:"value1A"    |XYZ:"value2A"   |ZXC:"value3A"                        |[[ABC -> "value1A"    ], [XYZ -> "value2A"   ], [ZXC -> "value3A"]]                                    |
+------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+

Update-1:

To retrieve a value from a array of map for a given key value. Ex 'ABC'

df3.createOrReplaceTempView("df3")

spark.sql("""
select mapc, xx, array_min(transform( mapc, (x,i) -> x[xx])) yy from (
select mapc, concat_ws('',filter(flatten(transform( mapc, (x,i) -> map_keys(x))), y -> y='ABC'))  xx
from df3 )
""").show(50,truncate=False)

+-------------------------------------------------------------------------------------------------------+---+-------------+
|mapc                                                                                                   |xx |yy           |
+-------------------------------------------------------------------------------------------------------+---+-------------+
|[[ABC -> "MobileData"], [XYZ -> "TableData"], [ZXC -> "MacData"], [MNB -> ""]]                         |ABC|"MobileData" |
|[[ABC -> "value1"    ], [XYZ -> "value2"   ], [ZXC -> "value3" ], [MNB -> "value4"]]                   |ABC|"value1"     |
|[[ABC ->  "valueA"   ], [XYZ -> "ValueB"   ], [ZXC -> "valueC" ], [MNB -> "valueD"], [POI -> "valueE"]]|ABC| "valueA"    |
|[[ABC -> "value11"    ], [XYZ -> "value12"   ], [ZXC -> "value13" ], [MNB -> "value14"]]               |ABC|"value11"    |
|[[ABC -> "value1A"    ], [XYZ -> "value2A"   ], [ZXC -> "value3A"]]                                    |ABC|"value1A"    |
+-------------------------------------------------------------------------------------------------------+---+-------------+