I'm pretty new to Spark and Elasticsearch.
Currently I'm trying to import data from elasticsearch.
val conf = new SparkConf().setAppName("test")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.nodes", "localhost:9200")
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("test-*/Things")
df.registerTempTable("tmpT")
df.printSchema
As the codes above, the schema for the data frame df is like this
root
|-- @timestamp: timestamp (nullable = true)
|-- @version: string (nullable = true)
|-- eventJ: struct (nullable = true)
| |-- action: string (nullable = true)
| |-- code: string (nullable = true)
| |-- process_id: string (nullable = true)
| |-- system_id: string (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- type: string (nullable = true)
| |-- ua_type: string (nullable = true)
| |-- uc_type: string (nullable = true)
| |-- ud_type: string (nullable = true)
| |-- um_type: string (nullable = true)
| |-- us_type: string (nullable = true)
|-- headerJ: struct (nullable = true)
| |-- endpointKeyHash: struct (nullable = true)
| | |-- string: string (nullable = true)
| |-- timestamp: struct (nullable = true)
| | |-- long: long (nullable = true)
|-- type: string (nullable = true)
And to see data in the dataframe I do this
df.show()
+--------------------+--------+--------------------+--------------------+---------+
| @timestamp|@version| eventJ| headerJ| type|
+--------------------+--------+--------------------+--------------------+---------+
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:21:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
+--------------------+--------+--------------------+--------------------+---------+
If I do
df.toJSON.foreach(a => println(a))
It shows this. I think it just transformed the structured json data to a string.
{"@timestamp":"2017-04-20T08:58:53.189+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.112:17500 to 172.16.1.255:17500 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646333189}},"type":"Things"}
{"@timestamp":"2017-04-20T08:59:51.434+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.1:38329 to 239.255.255.250:1900 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646391435}},"type":"Things"}
What I want do is converting the dataframe(df) to some format in which the json data is spliced in each column may be like below
action code process_id system_id ...
--------------------------------------------------------
ACCEPT... 1702 1 ESG ...
ACCEPT... 1702 1 ESG ...
How can I transform the dataframe read from elasticsearch to proper data format like above to do sql query in sparkSQL ? Any idea is more than welcome. Thanks in advance.
eventJ
to aDataFrame
that has a schema similar to that HTML table? – ImDarrenG