I've created a sample document based on your data in ES 6.4/Spark 2.1 version and made use of the below code, in order to read GenerateTime
field as text
instead of date type in spark.
Mapping in ES
PUT somedateindex
{
"mappings": {
"mydocs":{
"properties": {
"GenerateTime": {
"type": "date",
"format": "yyyy/MM/dd HH:mm:ss"
}
}
}
}
}
Notice that the field is of date
type in ES.
Spark Code to use date field in ES as String
Note that I've made use of the config option("es.mapping.date.rich", false)
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.resource.read","somedateindex")
.option("es.nodes", "some_host_name")
.option("es.mapping.date.rich", false)
.option("es.port","9200")
.load()
df.show()
df.printSchema()
Spark Code Result in my Eclipse Console:
19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
| GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+
root
|-- GenerateTime: string (nullable = true)
19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....
Notice that printSchema
shows that table has a single column GenerateTime
which is of type string
.
If you do not want to go ahead and change the mappings the above should help you.
I recommend to have date fields in date format rather than text and that too in ISO-8601 supported format, that way when type inference kicks-in, you end up getting data in correct type in Spark and you can simply focus on business logic, many times the correct solution lies in how we store the data rather than how we process it.
Spark code to convert String into Timestamp/Date
However, if for some reason you are not able to change the mappings from the source i.e. elasticsearch, you can further add the below code to transform the string value into timestamp using the below code:
import org.apache.spark.sql.functions._
//String into Timestamp Transformation
val df2_timestamp = df.withColumn("GenerateTime_timestamp", from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
df2_timestamp.show(false)
df2_timestamp.printSchema();
If you run the above code, you'd see the output as below:
19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+
root
|-- GenerateTime: string (nullable = true)
|-- GenerateTime_timestamp: timestamp (nullable = true)
19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook
Also note that my solution is in Scala. Let me know if it helps!