3
votes

I am using Spark 2.3 (Pyspark) to read data from an Elasticsearch 6.6 index.
The Spark job is attempting to create a df and is failing with a parse issue:

Spark Code:

df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource.read", index_name).option("es.nodes", hosts).load()

Error Message:

org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2019/05/06 19:31:21] for field [GenerateTime]

I believe this is caused in part by the source date format not being in a recognized ISO 8601 format.

Also, in reading the Time/Date Mapping docs, I understand this can be addressed by creating a mapping but this will only affect new indexes and wont change the mapping of the historical indexes.

Question:

Is there a way to address this issue so that I can successfully read from the historical indexes via Spark (e.g. prior to any mapping changes that may be required)? I also, tried .option("es.mapping.date.rich", False) without any luck.

1
hey I've updated my answer with some more details. I just hope it helps!!Opster ES Ninja - Kamal

1 Answers

5
votes

I've created a sample document based on your data in ES 6.4/Spark 2.1 version and made use of the below code, in order to read GenerateTime field as text instead of date type in spark.

Mapping in ES

PUT somedateindex
{
  "mappings": {
    "mydocs":{
      "properties": {
        "GenerateTime": {
          "type": "date",
          "format": "yyyy/MM/dd HH:mm:ss"
        }
      }
    }
  }
}

Notice that the field is of date type in ES.

Spark Code to use date field in ES as String

Note that I've made use of the config option("es.mapping.date.rich", false)

    val spark = SparkSession
                .builder()
                .appName("Spark SQL basic example")
                .config("spark.master", "local")
                .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.format("org.elasticsearch.spark.sql")
            .option("es.resource.read","somedateindex")
            .option("es.nodes", "some_host_name")
            .option("es.mapping.date.rich", false)
            .option("es.port","9200")
            .load()

   df.show()
   df.printSchema()

Spark Code Result in my Eclipse Console:

19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
|       GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+

root
 |-- GenerateTime: string (nullable = true)

19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....

Notice that printSchema shows that table has a single column GenerateTime which is of type string.

If you do not want to go ahead and change the mappings the above should help you.

I recommend to have date fields in date format rather than text and that too in ISO-8601 supported format, that way when type inference kicks-in, you end up getting data in correct type in Spark and you can simply focus on business logic, many times the correct solution lies in how we store the data rather than how we process it.

Spark code to convert String into Timestamp/Date

However, if for some reason you are not able to change the mappings from the source i.e. elasticsearch, you can further add the below code to transform the string value into timestamp using the below code:

    import org.apache.spark.sql.functions._

    //String into Timestamp Transformation
    val df2_timestamp = df.withColumn("GenerateTime_timestamp",  from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
    df2_timestamp.show(false)
    df2_timestamp.printSchema();

If you run the above code, you'd see the output as below:

19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime       |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+

root
 |-- GenerateTime: string (nullable = true)
 |-- GenerateTime_timestamp: timestamp (nullable = true)

19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook

Also note that my solution is in Scala. Let me know if it helps!