I am trying to batch insert documents into MongoDB. In addition, I want to do this in multiple threads:
final AtomicInteger i = new AtomicInteger(0);
final List<InsertOneModel<Document>> set = new CopyOnWriteArrayList<>();
dbObjects.stream().parallel().forEach(it -> {
set.add(it);
if (i.incrementAndGet() % 1000 == 0) {
mongoDBService.insertBulk(metaInfo, set);
set.clear();
}
});
mongoDBService.insertBulk(metaInfo, set);
The insertBulk method looks like this:
public void insertBulk(EntryMetaInfo collectionInfo, List<InsertOneModel<Document>> dbObjects) {
MongoDatabase db = getDb(collectionInfo);
MongoCollection<Document> collection = db.getCollection(collectionInfo.getCollectionName());
collection.bulkWrite(dbObjects);
}
I also tried the insertMany method. I use the following dependency in pom:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.6.4</version>
</dependency>
I get the following error:
Bulk write operation error on server localhost:27017. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection: dbName.collectionName index: id dup key: { _id: ObjectId('5e312c46e9a20f090e52132c') }', details={ }}].
If I remove parallel (), then everything works. What to do so that data is written in batches and in several streams?
UPD: The objects to insert I get like this:
List<InsertOneModel<Document>> dbObjects = fiasFileService.processFile(file, ADDR_OBJ_MAPPER);
public static final DbfMapper<InsertOneModel<Document>> ADDR_OBJ_MAPPER = row ->
new InsertOneModel<>(
new Document()
.append("_id", new ObjectId())
.append("actstatus", row.getInt("ACTSTATUS"))
.append("aoguid", row.getString("AOGUID"))
.append("aoid", row.getString("AOID"))
.append("aolevel", row.getInt("AOLEVEL"))
.append("areacode", row.getString("AREACODE"))
);
That is, I form a unique identifier (_id), but there is still an error