We are trying to read large number of XML's and run Xquery on them in pyspark for example books xml. We are using spark-xml-utils library.
- We want to feed the directory containing xmls to pyspark.
- Run Xquery on all of them to get our results.
reference answer: Calling scala code in pyspark for XSLT transformations
The definition of xquery processor where xquery is the string of xquery:
proc = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(xquery)
We are reading the files in a directory using:
sc.wholeTextFiles("xmls/test_files")
This gives us an RDD containing all the files as a list of tuples:
[ (Filename1,FileContentAsAString), (Filename2,File2ContentAsAString) ]
The xquery evaluates and gives us results if we run on the string (FileContentAsAString)
whole_files = sc.wholeTextFiles("xmls/test_files").collect()
proc.evaluate(whole_files[1][1])
# Prints proper xquery result for that file
Problem:
If we try to run proc.evaluate() on the RDD using lambda function, it is failing.
test_file = sc.wholeTextFiles("xmls/test_files")
test_file.map(lambda x: proc.evaluate(x[1])).collect()
# Should give us a list of xquery results
Error:
PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
These functions work somehow but not the evaluate above:
Print the content xquery is applied on
test_file.map(lambda x: x[1]).collect()
# Outputs the content. if x[0], gives us the list of filenames
Return the len of characters in the contents
test_file.map(lambda x: len(x[1])).collect()
# Output: [15274, 13689, 13696]
Books example for reference:
books_xquery = """for $x in /bookstore/book
where $x/price>30
return $x/title/data()"""
proc_books = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(books_xquery)
books_xml = sc.wholeTextFiles("xmls/books.xml")
books_xml.map(lambda x: proc_books.evaluate(x[1])).collect()
# Error
# I can share the stacktrace if you guys want