0
votes

I am trying to batch insert documents into MongoDB. In addition, I want to do this in multiple threads:

final AtomicInteger i = new AtomicInteger(0);
final List<InsertOneModel<Document>> set = new CopyOnWriteArrayList<>();
dbObjects.stream().parallel().forEach(it -> {
    set.add(it);
    if (i.incrementAndGet() % 1000 == 0) {
        mongoDBService.insertBulk(metaInfo, set);
        set.clear();
    }
});
mongoDBService.insertBulk(metaInfo, set);

The insertBulk method looks like this:

public void insertBulk(EntryMetaInfo collectionInfo, List<InsertOneModel<Document>> dbObjects) {
        MongoDatabase db = getDb(collectionInfo);
        MongoCollection<Document> collection = db.getCollection(collectionInfo.getCollectionName());
        collection.bulkWrite(dbObjects);
}

I also tried the insertMany method. I use the following dependency in pom:

    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.6.4</version>
    </dependency>

I get the following error:

Bulk write operation error on server localhost:27017. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection: dbName.collectionName index: id dup key: { _id: ObjectId('5e312c46e9a20f090e52132c') }', details={ }}].

If I remove parallel (), then everything works. What to do so that data is written in batches and in several streams?

UPD: The objects to insert I get like this:

List<InsertOneModel<Document>> dbObjects = fiasFileService.processFile(file, ADDR_OBJ_MAPPER);

public static final DbfMapper<InsertOneModel<Document>> ADDR_OBJ_MAPPER = row ->
        new InsertOneModel<>(
                new Document()
                        .append("_id", new ObjectId())
                        .append("actstatus", row.getInt("ACTSTATUS")) 
                        .append("aoguid", row.getString("AOGUID"))
                        .append("aoid", row.getString("AOID"))
                        .append("aolevel", row.getInt("AOLEVEL"))
                        .append("areacode", row.getString("AREACODE"))
        );

That is, I form a unique identifier (_id), but there is still an error

1

1 Answers

0
votes

If the document does not specify an _id field, then mongod adds the _id field and assign a unique ObjectId for the document. Most drivers create an ObjectId and insert the _id field, but the mongod will create and populate the _id if the driver or application does not.

There are two solutions

  1. You can halt for some moment before insert.

  2. You can make our own _id, which should have to unique.