5
votes

So I'm learning to take data from ElasticSearch throught Apache Spark. Let's say I've connected to ElasticSearch that has 'users' index.

sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')

explain(usersES) shows me this:

== Physical Plan ==

Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147, uid#148]

When I use filter:

usersES.filter(usersES.uid==1566324).explain()

== Physical Plan == Filter (uid#203L = 1566324) +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters: [EqualTo(uid,1566324)]

As you see, Spark elegantly pushes the filter to ElasticSearch, making the index search fast and comfortable.

But when I try joining usersES with another dataframe, I get the same issue all the time: Spark scans through the whole ElasticSearch index, not pushing any filters I give it. For example:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()

shows:

SortMergeJoin [id#210L], [uid#203L] :- Sort [id#210L ASC], false, 0 : +- TungstenExchange hashpartitioning(id#210L,200), None : +- ConvertToUnsafe : +- Scan ExistingRDD[id#210L] +- Sort [uid#203L ASC], false, 0 +- TungstenExchange hashpartitioning(uid#203L,200), None +- ConvertToUnsafe +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148]

Please, tell me, is that possible to push filter inside Elasticsearch inside the join?

1

1 Answers

4
votes

This is an expected behavior, yes elaticsearch-hadoop connector supports pushdown predicate but there is no push when you join.

This is because the join operation does not know anything about how the keys are partitioned in your dataframes.

By default, this operation will hash all the keys of both dataframes, sending all the elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine.

And that's why you get that execution plan without the predicate being pushed down.

EDIT : It seems like the connector supports since the version 2.1 the IN clause. You ought using that if your DataFrame a isn't big.