2
votes

For testing purposes, I have configured a 4-node cluster, each of them has a Spark Worker and a MongoDB Shard. These are the details:

  • Four Debian 9 servers (named visa0, visa1, visa2, visa3)
  • Spark(v2.4.0) cluster on 4 nodes (visa1: master, visa0..3: slaves)
  • MongoDB (v3.2.11) sharded cluster con 4 nodes ( config server replica set on visa1..3, mongos on visa1, shard servers: visa0..3 )
  • I'm using MongoDB Spark connector installed with "spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

When configuring SparkSession with MongoShardedPartitioner, every dataframe loaded from the database is empty, though the dataframe schema is fetched correctly.

This is reproduced either the configuration is done in the spark-defaults.conf file or with .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") in the SparkSession builder.

With MongoShardedPartitioner, df.count() == 0:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>                                                                             
>>> df2.count()
0  

But works correctly without specifying partitioner:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162  

Questions:

  • How can I know which partitioner is configured by default?
  • How can MongoShardedPartitioner be used in this scenario?

Thanks in advance

Jan 13rd, 2019: recommended workaround

As answered below, it seems that MongoShardedPartitioner does not support hashed indexes as shard index. However, I need a hash index to distribute the chunks evenly on my nodes, independently of time (using _id would distribute chronologically, I guess).

My workaround has been to create a new field in the database with the computed md5 hash of a date bucket, indexing it (as a normal index), and using it as shard index.

Now, the code works fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> 
>>> 
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
...   .getOrCreate()
>>> 
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()

2019-01-13 11:19:31 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162   
1

1 Answers

0
votes

Sorry jose to hear you are having an issue with the connector.

How can I know which partitioner is configured by default?

Information regarding partitioners can be found on the Spark connector documentation site. Please file a ticket in the Docs jira project if you feel anything is missing or unclear, it really could help future users!

The default partitioner is a thin wrapper around the MongoSamplePartitioner. It splits up a collection into sized partitions based on statistical sampling of the collection.

How can MongoShardedPartitioner be used in this scenario?

The MongoShardedPartitioner uses the shardKey to generate the partitions. By default it will use _id as the key. You may need to configure that value.

Note: Hashed shardkeys are not supported by the MongoShardedPartitioner as currently there is no way to query a collection against the hashed value - so when retrieving partitions it will fail to return results. I've added DOCS-12345 to update the documentation.

It looks like there is an issue in your setup where the MongoShardedPartitioner is failing to partition the collection as expected and returning 0 results. Schema inference will still work because of how it queries the collection. If its not a config / hashed shardkey issue then issue please file a bug in the Spark jira project and I can help identify the cause and release a fix for you.