My goal is to get my Map-Reduce job to always run on the secondary of the shards of my MongoDB cluster.
I am setting the readPreference
to secondary and the out
param of the MapReduce command to inline
order to achieve this. This works fine on a non sharded Replica set: the job runs on the secondary. However, on a sharded cluster, this job runs on Primary.
Can someone explain why this happens or point to any relevant documentation? I couldn't find anything in the relevant documentation.
public static final String mapfunction = "function() { emit(this.custid, this.txnval); }";
public static final String reducefunction = "function(key, values) { return Array.sum(values); }";
...
private void mapReduce() {
...
MapReduceIterable<Document> iterable = collection.mapReduce(mapfunction, reducefunction);
...
}
...
Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
MongoClient client = new MongoClient(uri);
...
Logs from the secondary when this is executed on a Replica Set:
2016-11-23T15:05:26.735+0000 I COMMAND [conn671] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:7 reslen:4331 locks:{ Global: { acquireCount: { r: 44 } }, Database: { acquireCount: { r: 3, R: 19 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 124ms
Sharded collection:
mongos> db.txns.getShardDistribution()
Shard Shard-0 at Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017
data : 498KiB docs : 9474 chunks : 3
estimated data per chunk : 166KiB
estimated docs per chunk : 3158
Shard Shard-1 at Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017
data : 80KiB docs : 1526 chunks : 3
estimated data per chunk : 26KiB
estimated docs per chunk : 508
Totals
data : 579KiB docs : 11000 chunks : 6
Shard Shard-0 contains 86.12% data, 86.12% docs in cluster, avg obj size on shard : 53B
Shard Shard-1 contains 13.87% data, 13.87% docs in cluster, avg obj size on shard : 53B
Logs from the Primary of Shard-0:
2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0
Any pointers on expected behavior would be really useful. Thanks.