So I'm learning to take data from ElasticSearch throught Apache Spark. Let's say I've connected to ElasticSearch that has 'users' index.
sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')
explain(usersES) shows me this:
== Physical Plan ==
Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147, uid#148]
When I use filter:
usersES.filter(usersES.uid==1566324).explain()
== Physical Plan == Filter (uid#203L = 1566324) +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters: [EqualTo(uid,1566324)]
As you see, Spark elegantly pushes the filter to ElasticSearch, making the index search fast and comfortable.
But when I try joining usersES with another dataframe, I get the same issue all the time: Spark scans through the whole ElasticSearch index, not pushing any filters I give it. For example:
a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()
shows:
SortMergeJoin [id#210L], [uid#203L] :- Sort [id#210L ASC], false, 0 : +- TungstenExchange hashpartitioning(id#210L,200), None : +- ConvertToUnsafe : +- Scan ExistingRDD[id#210L] +- Sort [uid#203L ASC], false, 0 +- TungstenExchange hashpartitioning(uid#203L,200), None +- ConvertToUnsafe +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148]
Please, tell me, is that possible to push filter inside Elasticsearch inside the join?