Edit: This edit may change the course of this issue.
A mongodb aggregation (specifically with $group) run on spark is creating duplicate _id records when writing back to the collection. As a result, mongodb is throwing duplicate key error. BTW, this query runs perfectly fine in mongo shell.
Here is what I did:
I took a small data set and I printed the results of (aggregation) spark code to the console instead of writing into the collection. I printed the complete result set and I found duplicates in _id field. The data looks something like this: (edited)
Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=2223}}
Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=123}}.
There are many such repeated documents. what I don't understand is, why is spark not consolidating the complete (map ??) job before writing it to the collection? each partition is just mapping the records and writing it to the collection directly. That's is not how it is supposed to work right?
If there is any advise from experts on how to resolve this issue or you see anything that should be changed in my code from original post below. Please advise.
original Post:
I have the following collection.
prodTransactions:
{
_id:
ProdCategory:
product:
location:
customer:
eventTime:
status:
}
My aggregation lists all customers and dates for a {ProdCategory-Product-location} group where status is 'complete'. The following is the mongodb code.
db.prodTransactions.aggregate([
{$match: {status:'complete'}
, {$project:
{
prodId:1,
location:1,
customer:1,
status:1,
eventTime:1,
prodCategory:1
}}
, {$group:
{
_id: {prodCategory: "$prodCategory", lab: "$prodId", location: "$location"},
details: {$addToSet: {customer: "$customer", date: {$dateToString: {format: "%m/%d/%Y", date: "$eventTime"}}, time: {$dateToString: {format: "%H:%M:%S", date: "$eventTime"}}}},
count: {$sum: 1}
}}
, {$out : "prodAgg"}
],{allowDiskUse: true}
)
When I run this in mongodb directly, it runs perfectly, no issues and saves all the data to prodAgg collection. The aggregated collection looks like this (data edited):
{
"_id" : {
"prodCategory" : "category1",
"prodId" : "prod1",
"location" : "US-EAST"
},
"details" : [
{
"customer" : "[email protected]",
"date" : "07/15/2016",
"time" : "14:00:48"
},
{
"customer" : "[email protected]",
"date" : "07/15/2016",
"time" : "19:05:48"
},
{
"customer" : "[email protected]",
"date" : "07/15/2016",
"time" : "17:55:48"
},
{
"customer" : "[email protected]",
"date" : "07/15/2016",
"time" : "19:20:49"
}
],
"count" : 4.0
}
The issue is, If i execute this from spark trying to write it to a collection. It writes a few documents and then fails with the following exception (data edited):
com.mongodb.MongoBulkWriteException: Bulk write operation error on server 192.168.56.1:27017. Write errors: [BulkWriteError{index=6, code=11000, message='E11000 duplicate key error collection: dbname.prodAgg index: id dup key: { : { prodCategory: "xxx", prodId: "yyyyy", location: "US-EAST" } }', details={ }}]. at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)
This error has been haunting me for the past 3 days and i'm not able to get past this.
my understanding is (I may be wrong but), group aggregation in itself should not have any duplicates, then how/why is it throwing duplicate key error. Or am 'I doing something wrong in my aggregation? or scala code?
If any soul out there has seen this before, please shed some light and pull me out of this whirlpool. I would really be grateful
Here is my scala code. I'm using the
mongodb-spark-connector
import org.apache.spark.{SparkConf, SparkContext}
import com.mongodb.spark._
import org.bson.Document
import com.mongodb.spark.config._
val conf = new SparkConf().setAppName("ProdAnalytics1").setMaster("local").
set("spark.mongodb.input.uri","mongodb://192.168.56.1:27017/dbname.prodTransactions")
.set("spark.mongodb.output.uri", "mongodb://192.168.56.1:27017/dbname.prodAgg")
val sc = new SparkContext(conf)
val rdd = sc.loadFromMongoDB()
val aggRdd = rdd.withPipeline(Seq(
Document.parse("{$match:{status:'end'}"),
Document.parse("{$project: {prodId:1,location:1,customer:1,type:1,eventTime:1,prodCategory:1}}"),
Document.parse("{$group: {_id: {prodCategory: \"$prodCategory\", prodId: \"$prodId\", location: \"$location\"},details: {$addToSet: {customer: \"$customer\", date: \"$eventTime\"}},count: {$sum: 1}}}"),
Document.parse("{$sort: {count : -1}}, {allowDiskUse: true}")))
println(aggRdd.count)
// Using the write Config to Write to DB
val writeConf = WriteConfig(sc)
val writeConfig = WriteConfig(Map("collection" -> "prodAgg", "db" -> "dbname"), Some(writeConf))
MongoSpark.save(aggRdd, writeConfig)
My SBT file:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
//libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.6.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.1.0"
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
resolvers += "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/ "
resolvers += "releases" at "https://oss.sonatype.org/content/repositories/releases/"
Note: Reason for not using the latest version of spark is, in latest version it is throwing another exception:
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame
For the life of me I could not understand what this is and I'm not even using dataframes. so...I'll leave it at that..if someone has any advise on this, I will gladly take it.
Any advise is greatly appreciated...Thanks.
EDIT:
Here is the mongo log while scala code is running. This is the last piece before it failed (edited)
command dbname.ProdTransaction command: aggregate { aggregate: "ProdTransaction", pipeline: [ { $match: { _id: { $gte: ObjectId('554c949ae4b0d28d51194caf'), $lt: ObjectId('55be257be4b0c3bd1c74e202') } } }, { $match: { $and: [ { status: "end" }, { location: "US" }, { prodId: { $nin: [ "abc", "xxx", "yyy" ] } } ] } }, { $project: { prodId: 1, location: 1, customer: 1, status: 1, eventTime: 1, prodCategory: 1 } }, { $group: { _id: { lab: "$prodId", location: "$location" }, details: { $addToSet: { prodCategory: "$prodCategory", user: "$customer", date: "$eventTime" } }, count: { $sum: 1 } } }, { $sort: { count: -1 } } ] cursor: {} } cursorid:258455518867 keyUpdates:0 writeConflicts:0 numYields:1335 reslen:4092243 locks:{ Global: { acquireCount: { r: 2694 } },
Database: { acquireCount: { r: 1347 } }, Collection: { acquireCount: { r: 1347 } } } protocol:op_query 1155ms
prodAgg
collection and start over. – s7vrdb.prodAgg.getIndexes()
on your db ? – s7vrMongoDB Enterprise > db.prodAgg.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "dbname.prodAgg" } ]
– Vamsi