2
votes

Edit: This edit may change the course of this issue.

A mongodb aggregation (specifically with $group) run on spark is creating duplicate _id records when writing back to the collection. As a result, mongodb is throwing duplicate key error. BTW, this query runs perfectly fine in mongo shell.

Here is what I did:

I took a small data set and I printed the results of (aggregation) spark code to the console instead of writing into the collection. I printed the complete result set and I found duplicates in _id field. The data looks something like this: (edited)

Document{{_id=Document{{prodCategory=123},{proId=ABC},{‌​location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=2223}}

Document{{_id=Document{{prodCategory=123},{proId=ABC},{locat‌​ion=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=123}}.

There are many such repeated documents. what I don't understand is, why is spark not consolidating the complete (map ??) job before writing it to the collection? each partition is just mapping the records and writing it to the collection directly. That's is not how it is supposed to work right?

If there is any advise from experts on how to resolve this issue or you see anything that should be changed in my code from original post below. Please advise.

original Post:

I have the following collection.

prodTransactions:
{
_id:
ProdCategory:
product:
location:
customer:
eventTime:
status:
}

My aggregation lists all customers and dates for a {ProdCategory-Product-location} group where status is 'complete'. The following is the mongodb code.

db.prodTransactions.aggregate([
{$match: {status:'complete'}
, {$project: 
    {
        prodId:1,
        location:1,
        customer:1,
        status:1,
        eventTime:1,
        prodCategory:1
    }}
, {$group: 
    {
        _id: {prodCategory: "$prodCategory", lab: "$prodId", location: "$location"},
        details: {$addToSet: {customer: "$customer", date: {$dateToString: {format: "%m/%d/%Y", date: "$eventTime"}}, time: {$dateToString: {format: "%H:%M:%S", date: "$eventTime"}}}},
        count: {$sum: 1}
    }}
, {$out : "prodAgg"}
],{allowDiskUse: true}
)

When I run this in mongodb directly, it runs perfectly, no issues and saves all the data to prodAgg collection. The aggregated collection looks like this (data edited):

{
    "_id" : {
        "prodCategory" : "category1",
        "prodId" : "prod1",
        "location" : "US-EAST"
    },
    "details" : [ 
        {
            "customer" : "[email protected]",
            "date" : "07/15/2016",
            "time" : "14:00:48"
        }, 
        {
            "customer" : "[email protected]",
            "date" : "07/15/2016",
            "time" : "19:05:48"
        }, 
        {
            "customer" : "[email protected]",
            "date" : "07/15/2016",
            "time" : "17:55:48"
        }, 
        {
            "customer" : "[email protected]",
            "date" : "07/15/2016",
            "time" : "19:20:49"
        }
    ],
    "count" : 4.0
}

The issue is, If i execute this from spark trying to write it to a collection. It writes a few documents and then fails with the following exception (data edited):

com.mongodb.MongoBulkWriteException: Bulk write operation error on server 192.168.56.1:27017. Write errors: [BulkWriteError{index=6, code=11000, message='E11000 duplicate key error collection: dbname.prodAgg index: id dup key: { : { prodCategory: "xxx", prodId: "yyyyy", location: "US-EAST" } }', details={ }}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)

This error has been haunting me for the past 3 days and i'm not able to get past this.

my understanding is (I may be wrong but), group aggregation in itself should not have any duplicates, then how/why is it throwing duplicate key error. Or am 'I doing something wrong in my aggregation? or scala code?

If any soul out there has seen this before, please shed some light and pull me out of this whirlpool. I would really be grateful

Here is my scala code. I'm using the

mongodb-spark-connector

import org.apache.spark.{SparkConf, SparkContext}
import com.mongodb.spark._
import org.bson.Document
import com.mongodb.spark.config._

val conf = new SparkConf().setAppName("ProdAnalytics1").setMaster("local").
      set("spark.mongodb.input.uri","mongodb://192.168.56.1:27017/dbname.prodTransactions")
        .set("spark.mongodb.output.uri", "mongodb://192.168.56.1:27017/dbname.prodAgg")

      val sc = new SparkContext(conf)
      val rdd = sc.loadFromMongoDB()

      val aggRdd = rdd.withPipeline(Seq(
      Document.parse("{$match:{status:'end'}"),
      Document.parse("{$project: {prodId:1,location:1,customer:1,type:1,eventTime:1,prodCategory:1}}"),
      Document.parse("{$group: {_id: {prodCategory: \"$prodCategory\", prodId: \"$prodId\", location: \"$location\"},details: {$addToSet: {customer: \"$customer\", date: \"$eventTime\"}},count: {$sum: 1}}}"),
      Document.parse("{$sort: {count : -1}}, {allowDiskUse: true}")))
    println(aggRdd.count)

// Using the write Config to Write to DB
     val writeConf = WriteConfig(sc)
     val writeConfig = WriteConfig(Map("collection" -> "prodAgg", "db" -> "dbname"), Some(writeConf))
     MongoSpark.save(aggRdd, writeConfig)

My SBT file:

name := "Simple Project"
    version := "1.0"

    scalaVersion := "2.11.7"

    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"

    //libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.6.1"

    libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1"

    libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.1.0"

    libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"

    resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
    resolvers += "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/ "
    resolvers += "releases"  at "https://oss.sonatype.org/content/repositories/releases/"

Note: Reason for not using the latest version of spark is, in latest version it is throwing another exception:

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame

For the life of me I could not understand what this is and I'm not even using dataframes. so...I'll leave it at that..if someone has any advise on this, I will gladly take it.

Any advise is greatly appreciated...Thanks.

EDIT:

Here is the mongo log while scala code is running. This is the last piece before it failed (edited)

command dbname.ProdTransaction command: aggregate { aggregate: "ProdTransaction", pipeline: [ { $match: { _id: { $gte: ObjectId('554c949ae4b0d28d51194caf'), $lt: ObjectId('55be257be4b0c3bd1c74e202') } } }, { $match: { $and: [ { status: "end" }, { location: "US" }, { prodId: { $nin: [ "abc", "xxx", "yyy" ] } } ] } }, { $project: { prodId: 1, location: 1, customer: 1, status: 1, eventTime: 1, prodCategory: 1 } }, { $group: { _id: { lab: "$prodId", location: "$location" }, details: { $addToSet: { prodCategory: "$prodCategory", user: "$customer", date: "$eventTime" } }, count: { $sum: 1 } } }, { $sort: { count: -1 } } ] cursor: {} } cursorid:258455518867 keyUpdates:0 writeConflicts:0 numYields:1335 reslen:4092243 locks:{ Global: { acquireCount: { r: 2694 } },
Database: { acquireCount: { r: 1347 } }, Collection: { acquireCount: { r: 1347 } } } protocol:op_query 1155ms

4
You should go back to db and check if there are any indexes and delete indexes for prodAgg collection and start over.s7vr
@SagarReddy: This is a new collection that is created at run time. So this does not exist before this execution.Vamsi
I think your problem is not with the data that you are inserting. Looks like the database collection state is not clean. What do you get when you run db.prodAgg.getIndexes() on your db ?s7vr
Here is what I see (edited). Any discrepencies that you see? MongoDB Enterprise > db.prodAgg.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "dbname.prodAgg" } ]Vamsi
Couple of things to note here. Your shell output is different from what you've in aggregation code. I don't think system will assign any id because your grouping key will be your id for the collection. So help me understand here does the aggregation code insert any data into collection ?? You may want to strip all the code and just try basic aggregation to see where the problem is. Get rid of group altogether and see if you can run aggregation.s7vr

4 Answers

1
votes

Theres lots to answer here, so I've broken my answer down to three sections; configuration, why it happens and how to work around it:

Configuration issues

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame

Happens when the wrong version of the MongoDB Spark Connector is used with Spark. If you want to use Spark 2.0.x then you'll need the 2.0.x MongoDB Spark Connector eg:

libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.0.0"

(These misleading errors can also happen if your Scala versions aren't in sync eg using Scala 2.10 with a library that compiled for 2.11)

Regarding the installation there is no need for: libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1". Its a separate library with its own MongoClient, Codecs etc so will likely cause errors when used alongside the connector.

Why you can expect duplicate _ids with $group aggregations

Spark works by partitioning data collections and then parallelising the data processing on them across Spark worker nodes.

There are various partitioners in the Mongo Spark Connector and all of them default the partitioning of a collection on the _id key of the Document. The aggregations are then performed on each of the partitions.

So as you are running multiple aggregations each on a partition of the collection, you could reasonably expect duplicate _id's produced in the resulting RDD.

Fixing duplicate _ids

I can think of three possible fixes for the issue:

A) Given the nature of your aggregation pipeline you should use the $out operator. This is much more beneficial as the data stays local to MongoDB and you don't need to maintain a Spark cluster.

B) A Spark based alternative would be to do further processing on the RDD to merge any duplicated _id's before saving back to MongoDB.

C) In theory you could provide your own partitioner that returns partitions based on the grouped _id fields. In reality I can't think of a good way to do this without requiring partition queries that uses the $in filter which isn't very performant.

0
votes

I could be mistaken, but since your aggregate is returning an object with the key _id, mongo will try to use this as the ID for your document when inserting. Not sure if this is your desired result... If not, just change the _id key to something else (id even would work)

0
votes

I have done some trail and error and narrowed down the issue (with help of Sagar Reddy's advise). Here is what I found.

The aggregation works if there is no $group aggregation in the code. Just $match, $project, $sort..any combination of commands work fine. Once I put $group, even with one parameter, it fails.

The reason I think it is, $group is the only aggregation that adds new _id to the collection. no other aggregation command will add a "new" _id. This is what the issue is and here is where the problem is.

My problem is I need $group for my aggregation and it would'nt help without it.

If any one of you have a solution/information on this or a workaround, please advise.

Many Thanks.

0
votes

I've been investigating your issue with MongoDB, here are some of my thoughts:

  1. I've noticed a mistake in your aggregation query - is a lack of curly brace in "{$match:{status:'end'}" intentional ? It should ends up with a double brace - "{$match:{status:'end'}}". I changed it in my bug reproduction code.
  2. IMO your aggregation query is not producing duplicated keys - once you run Spark application the prodAgg collection is created and filled with aggregation result and then if you run it second time it will produce the same keys but old collection won't be dropped first. This is a reason of your problem.

Add the following lines just before your spark aggregation:

val mongoClient = new MongoClient() 
val db = mongoClient.getDatabase("dbname")
db.getCollection("prodAgg").drop()

The aggregation works if there is no $group aggregation in the code. Just $match, $project, $sort..any combination of commands work fine. Once I put $group, even with one parameter, it fails.

I couldn't reproduce that behaviour but if I add exclusion of a document's _id to the $project operator it works as your said. In that case I have an explanation - aggregation creates a new document with new _id when there is no source document's _id and then you don't have duplications. Without exclusion of _id in projection it should be inherited from source document and then duplications should occur.