2
votes

I am trying to read an ElasticSearch index which has millions of docs each having variable number of fields. I have a schema that has 1000's of fields each with its own name and type.

Now when I create a RDD trough ES-Hadoop connector and later convert into a DataFrame by specifying the schema, it fails saying -

Input row doesn't have expected number of values required by the schema

I have a few questions. 1. Is it possible to have a RDD/DF with rows containing variable number of fields? If not, what is the alternative other than adding null value for missing fields in each column?

  1. I see that by default Spark converts everything into StringType as I use sc.newAPIHadoopRDD() call. How can I typecast them to correct type based on the field name that I have in my schema? Some kind of mapping?

  2. I want to write this in Parquet format with schema added into the file. What happens to those missing fields compared to the schema which has 1000's of fields.

1
I would write a custom UDF to do this. - Sailesh Kotha

1 Answers

1
votes
  1. You cannot have a variable number of columns, but you can use one column of a collection type like an Array or a Map, which in python corresponds with a dictionary. This allows you to store variable-length data in your column. Otherwise yes, you need to have a value for every column in your schema. You would typically fill missing values with nulls.

  2. If you already have a dataframe, and you have a function get_column_type that obtains the type name from the column name, you can recast the whole dataframe like this:

    import pyspark.sql.functions as F
    select_expressions = [ F.col(column_name).cast(get_column_type(column_name)) for column_name in column_list]
    recasted_df = df.select(*select_expressions)
    
  3. The parquet file will have whatever columns you have in your dataframe. If you want the 1000 fields in the file, they will have to be in the dataframe, so you would have to fill the missing values with nulls or some other value.

Now, if you put all those points together you probably want to do something like this:

  • Read each elastic doc into a row with an id field and doc field of type MapType.
  • explode the doc field so now you have 3 columns: id, key and value, with one row for each key in each doc. At this point you can write to parquet file and be done with the process.

If you want the dataframe with the full schema, you have to follow these extra steps:

  • Pivot the result to generate one only row for each id, and a column for each key in the doc with its corresponding value: pivoted_df = df.groupBy('id').pivot('key').agg(F.first('value')
  • This dataframe has all the fields present in the data. If you know the full schema, you can add dummy columns for the ones that are missing: df = df.withColumn('new_column', lit(None).cast(StringType())
  • Finally recast the columns with code in point 2, and drop column id. You can write this to parquet and it will have all the columns in your big schema.