0
votes

Spark 2.0.0+

I am using built-in csv data source directly to parse some .csv files (e.g. input_file_01.csv, input_file_02.csv etc.) into a spark Dataframe df:

df = spark.read.csv('input_file_*.csv',
         header = True,
         inferSchema = False, # problematic col names trailing apostrophs
         sep=';',
         multiLine = False,
         enforceSchema = True,
         schema = jsonSchema,
         mode='FAILFAST' # debug / DROPMALFORMED
)

They should all have the same schema (jsonSchema).

jsonSchema = StructType([
    StructField("F1", StringType()),
    StructField("F2", StringType()),
    StructField("F3", StringType())
])

Problem:

They should be all the same but some have malicious headers:

parsed_input_file_01.take(3)

>>>[u'"F1","F2","F3"',
    u'"a","b","c"',
    u'"d","e","f"']
parsed_input_file_17.take(3) # malformed file

>>>[u'"`F1`","`F2`","`F3`"', # all columns malformed: '`F1`&#39
    u'"a","b","c"',
    u'"d","e","f"']
parsed_input_file_945.take(3) # malformed file

>>>[u'"F1","F2","`F3`"', # malformed third "column": '`F3`&#39
    u'"a","b","c"',
    u'"d","e","f"']

Traceback:

Py4JJavaError: An error occurred while calling o9210.fromDF.:
org.apache.spark.sql.AnalysisException: cannot resolve '`F1`' given input columns: [F1, F2, F3];;
...

Question:

Given that I do not want to drop data of entire files (by e.g. .option('mode', 'DROPMALFORMED'), if that is even possible like such) I am looking for a performant way of reading all the data by (py)spark.

My approach would be to separately parse just the header of all files and identify malicious files and delete those unwanted &#39 apostrophes or change encoding.

  1. How to identify those wrong files or different encoding in spark?
  2. Is there a performant way of parsing the header first and sanitize wrong files by regex'ing the header in spark?

Conditions:

  • a general option like quote or escaping won't fix the issue as the headers change arbitrarily.
  • the schema should not be inferred as the desired schema (and correct (general) column names and datatypes) are known.
  • all csv files need to be combined in one single dataframe.
1
Provide the schema on read. Read this answer: stackoverflow.com/a/34528938/7441537André Machado
General schema is provided. But some files are not read correctly as the StructField name mismatches (because of the unwanted &#39). See example of file 17 header line.hard

1 Answers

0
votes

You probably need to make use of RDD here:

dir_path = 'data'
data_rdd = sc.wholeTextFiles(dir_path)
data_flat_map = data_rdd.flatMap(lambda a: a[1].split()[1:])
d = data_flat_map.map(lambda a: a.split(","))

from pyspark.sql.types import *
schema = StructType([StructField("F1", StringType(), True),
    StructField("F2", StringType(), True),
    StructField("F3", StringType(), True)
])
df = spark.createDataFrame(d, schema)

Let me know if this is what you are looking for.

Thanks, Hussain Bohra