3
votes

I have the following xml document

<a date="26-03-2018" id="1">
<text>
</text>
<metadata>
<b>
<c c="STRING1">
<d="value" e="string"/>
</c>
<c c="STRING2">
<d="value2" e="string" />
</c>
</b>
</metadata>
</a>

By using data bricks xml parser,I want to extract the string1,string2 values of "c" as a list to the column[metadata] of dataframe but when I infer with custom schema

schema = StructType([
StructField("date", StringType(), True),
StructField("id", LongType(), True),
StructField("text", StringType(), True),
StructField("metadata", StructType([
StructField("b", StringType(), True)]), True),])

and the dataframe for the above schema

----------------------------------------------------------------------------------------------------------------------
 Id | date       | text | metadata 
----------------------------------------------------------------------------------------------------------------------
 1  | 26-03-2018 | text |' <c c="STRING1"> <d="value" e="string"/></c><c c="STRING2"><d="value2" e="string" /> </c>'

I am getting the entire data as string from 'b' node. Any ideas on how to extract only strings using databricks xml parser to the column named metadata or is there any other parser available.I couldn't find the correct solution. I am new to spark. TIA

1

1 Answers

0
votes

You can use rdd to parse and ElementTree library.

from pyspark.sql import Row
import xml.etree.ElementTree as ET

row_counter = Row('columnName1', 'columnName2', 'columnName3')

def parser_xml(string_xml):
   root = ET.fromstring(string_xml.encode('ISO-8859-1', errors='replace'))
   ''' Implement all parser logic
   '''
   columnName1 = root.find('test').attrib['value1']
   columnName2 = root.find('test2').attrib['value2']
   columnName3 = root.find('test3').attrib['value3']

   return row_counter(columnName1, columnName2, columnName3)

rdd = sc.wholeTextFiles("/files/*.xml")
data = rdd.map(lambda (string_file): parser_xml(string_file[1])) 
df = spark.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
df.write.parquet('output')