0
votes

I'm pretty new to Spark and Elasticsearch.

Currently I'm trying to import data from elasticsearch.

val conf = new SparkConf().setAppName("test")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.nodes", "localhost:9200")
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("test-*/Things")
df.registerTempTable("tmpT")
df.printSchema

As the codes above, the schema for the data frame df is like this

root
 |-- @timestamp: timestamp (nullable = true)
 |-- @version: string (nullable = true)
 |-- eventJ: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- process_id: string (nullable = true)
 |    |-- system_id: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- ua_type: string (nullable = true)
 |    |-- uc_type: string (nullable = true)
 |    |-- ud_type: string (nullable = true)
 |    |-- um_type: string (nullable = true)
 |    |-- us_type: string (nullable = true)
 |-- headerJ: struct (nullable = true)
 |    |-- endpointKeyHash: struct (nullable = true)
 |    |    |-- string: string (nullable = true)
 |    |-- timestamp: struct (nullable = true)
 |    |    |-- long: long (nullable = true)
 |-- type: string (nullable = true)

And to see data in the dataframe I do this

df.show()

+--------------------+--------+--------------------+--------------------+---------+
|          @timestamp|@version|              eventJ|             headerJ|     type|
+--------------------+--------+--------------------+--------------------+---------+
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:21:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
+--------------------+--------+--------------------+--------------------+---------+

If I do

df.toJSON.foreach(a => println(a))

It shows this. I think it just transformed the structured json data to a string.

{"@timestamp":"2017-04-20T08:58:53.189+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.112:17500 to 172.16.1.255:17500 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646333189}},"type":"Things"}
{"@timestamp":"2017-04-20T08:59:51.434+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.1:38329 to 239.255.255.250:1900 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646391435}},"type":"Things"}

What I want do is converting the dataframe(df) to some format in which the json data is spliced in each column may be like below

    action      code    process_id  system_id   ...
--------------------------------------------------------
    ACCEPT...   1702    1           ESG         ...
    ACCEPT...   1702    1           ESG         ...

How can I transform the dataframe read from elasticsearch to proper data format like above to do sql query in sparkSQL ? Any idea is more than welcome. Thanks in advance.

1
I'm not sure if I understand your question. Do you need to format your dataframe as an HTML table? Or do you want to extract the json objects in column eventJ to a DataFrame that has a schema similar to that HTML table?ImDarrenG
I just want to extract eventJ and split those fields as database columns.Jay Hong

1 Answers

0
votes

You can do a select on the eventJ struct's properties:

val eventJDf = df.select(df.col("eventJ.action"), df.col("eventJ.code"), df.col("eventJ.process_id"), df.col("eventJ.system_id") /*...*/)