For testing purposes, I have configured a 4-node cluster, each of them has a Spark Worker and a MongoDB Shard. These are the details:
- Four Debian 9 servers (named visa0, visa1, visa2, visa3)
- Spark(v2.4.0) cluster on 4 nodes (visa1: master, visa0..3: slaves)
- MongoDB (v3.2.11) sharded cluster con 4 nodes ( config server replica set on visa1..3, mongos on visa1, shard servers: visa0..3 )
- I'm using MongoDB Spark connector installed with "spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
When configuring SparkSession with MongoShardedPartitioner
, every dataframe loaded from the database is empty, though the dataframe schema is fetched correctly.
This is reproduced either the configuration is done in the spark-defaults.conf
file or with .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner")
in the SparkSession builder.
With MongoShardedPartitioner
, df.count() == 0:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>
>>> df2.count()
0
But works correctly without specifying partitioner:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162
Questions:
- How can I know which partitioner is configured by default?
- How can
MongoShardedPartitioner
be used in this scenario?
Thanks in advance
Jan 13rd, 2019: recommended workaround
As answered below, it seems that MongoShardedPartitioner
does not support hashed indexes as shard index. However, I need a hash index to distribute the chunks evenly on my nodes, independently of time (using _id would distribute chronologically, I guess).
My workaround has been to create a new field in the database with the computed md5 hash of a date bucket, indexing it (as a normal index), and using it as shard index.
Now, the code works fine:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>>
>>>
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
... .getOrCreate()
>>>
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-13 11:19:31 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162