BulkOperations bulkOps = mongoOperations.bulkOps( BulkOperations.BulkMode.UNORDERED, // 或 ORDERED OrderStatusInfoDocument.class ); 是什么意思?
这行代码是使用 Spring Data MongoDB 进行批量操作的初始化,我来详细解释一下:
含义解析
BulkOperations bulkOps = mongoOperations.bulkOps(
BulkOperations.BulkMode.UNORDERED, // 批量操作模式
OrderStatusInfoDocument.class // 操作的文档类
);
参数详解
1. BulkOperations.BulkMode.UNORDERED
批量操作的模式:
UNORDERED(无序批量操作):
- 所有操作并发执行,不保证执行顺序
- 遇到失败的操作不会停止,会继续执行其他操作
- 性能更高,适合不需要顺序保证的场景
ORDERED(有序批量操作):
- 按添加顺序依次执行
- 遇到失败就停止后续操作
- 保证操作顺序,但性能较低
2. OrderStatusInfoDocument.class
指定操作的目标集合,这个类通常有 @Document 注解,映射到 MongoDB 的某个集合。
使用示例
// 1. 创建批量操作对象
BulkOperations bulkOps = mongoOperations.bulkOps(
BulkOperations.BulkMode.UNORDERED,
OrderStatusInfoDocument.class
);
// 2. 添加多个操作
bulkOps.insert(order1);
bulkOps.insert(order2);
bulkOps.updateOne(
Query.query(Criteria.where("orderId").is("123")),
Update.update("status", "SHIPPED")
);
bulkOps.remove(Query.query(Criteria.where("status").is("CANCELLED")));
// 3. 执行批量操作
BulkWriteResult result = bulkOps.execute();
实际应用场景
// 批量更新订单状态
public void batchUpdateOrderStatus(List<OrderUpdate> updates) {
BulkOperations bulkOps = mongoOperations.bulkOps(
BulkOperations.BulkMode.UNORDERED,
OrderStatusInfoDocument.class
);
for (OrderUpdate update : updates) {
bulkOps.updateOne(
Query.query(Criteria.where("orderId").is(update.getOrderId())),
Update.update("status", update.getNewStatus())
.set("updateTime", new Date())
);
}
BulkWriteResult result = bulkOps.execute();
System.out.println("修改了 " + result.getModifiedCount() + " 条记录");
}
批量操作的优势
- 减少网络开销:多个操作一次发送
- 提高性能:相比单条操作,性能提升明显
- 原子性:在有序模式下提供一定的顺序保证
- 简化代码:批量处理逻辑更清晰
注意事项
- UNORDERED 模式:适合日志记录、数据同步等可重试场景
- ORDERED 模式:适合有依赖关系的操作序列
- 单次批量操作有大小限制(默认 1000 个操作,16MB 数据)
这就是 MongoDB 批量操作的基本用法,特别适合需要高效处理大量数据的场景。
Update update = new Update() .setOnInsert("_id", orderStatusInfoDocument.getId()) .setOnInsert("createdAt", Instant.now()) .set("updatedAt", Instant.now()) .set("orderStatus", orderStatusInfoDocument.getOrderStatus()) .set("workerOrderStatus", orderStatusInfoDocument.getWorkerOrderStatus()) .set("workerOrderOperations", orderStatusInfoDocument.getWorkerOrderOperations());
bulkOps.upsert(query, update);
});
// 执行批量操作
BulkWriteResult result = bulkOps.execute(); 是什么?
这是使用 MongoDB 批量更新插入(upsert)操作的代码。让我详细解释:
代码解析
// 1. 创建 Update 对象
Update update = new Update()
.setOnInsert("_id", orderStatusInfoDocument.getId()) // 仅插入时设置
.setOnInsert("createdAt", Instant.now()) // 仅插入时设置
.set("updatedAt", Instant.now()) // 插入和更新都设置
.set("orderStatus", orderStatusInfoDocument.getOrderStatus())
.set("workerOrderStatus", orderStatusInfoDocument.getWorkerOrderStatus())
.set("workerOrderOperations", orderStatusInfoDocument.getWorkerOrderOperations());
// 2. 添加到批量操作(upsert 模式)
bulkOps.upsert(query, update);
Update 方法详解
setOnInsert()
只在插入新文档时设置字段值:
- 如果文档不存在,执行插入,设置这些字段
- 如果文档已存在,只更新,不修改这些字段
.setOnInsert("createdAt", Instant.now()) // 只在插入时记录创建时间
set()
插入和更新都设置字段值:
.set("updatedAt", Instant.now()) // 每次操作都更新时间
Query 对象示例
// 通常根据唯一标识查询
Query query = Query.query(
Criteria.where("orderId").is(orderStatusInfoDocument.getOrderId())
);
// 或
Query query = Query.query(
Criteria.where("_id").is(orderStatusInfoDocument.getId())
);
完整的批量操作流程
// 1. 创建批量操作
BulkOperations bulkOps = mongoOperations.bulkOps(
BulkOperations.BulkMode.UNORDERED,
OrderStatusInfoDocument.class
);
// 2. 遍历订单列表,为每个订单创建 upsert 操作
orderList.forEach(order -> {
// 构建查询条件(根据 orderId 查找)
Query query = Query.query(
Criteria.where("orderId").is(order.getOrderId())
);
// 构建更新操作
Update update = new Update()
.setOnInsert("_id", order.getId()) // 只在插入时设置
.setOnInsert("createdAt", Instant.now()) // 只在插入时设置
.set("updatedAt", Instant.now()) // 每次都更新
.set("orderStatus", order.getOrderStatus()) // 更新状态
.set("lastModified", new Date()); // 更新修改时间
// 添加到批量操作
bulkOps.upsert(query, update);
});
// 3. 执行批量操作
BulkWriteResult result = bulkOps.execute();
BulkWriteResult 返回结果
执行后返回的结果包含操作统计信息:
BulkWriteResult result = bulkOps.execute();
// 获取统计信息
System.out.println("插入数量: " + result.getInsertedCount()); // 新插入的文档数
System.out.println("更新数量: " + result.getModifiedCount()); // 修改的文档数
System.out.println("匹配数量: " + result.getMatchedCount()); // 匹配到的文档数
System.out.println("删除数量: " + result.getDeletedCount()); // 删除的文档数
System.out.println("upsert 数量: " + result.getUpserts().size()); // upsert 操作数
// 获取 upsert 的详细信息
for (BulkWriteUpsert upsert : result.getUpserts()) {
System.out.println("索引: " + upsert.getIndex()); // 操作在批量中的索引
System.out.println("ID: " + upsert.getId()); // 插入文档的 _id
}
数据库行为示例
假设数据库中有以下订单:
| _id | orderId | status | createdAt | updatedAt |
|---|---|---|---|---|
| 1 | 1001 | NEW | 2024-01-01 | 2024-01-01 |
| 2 | 1002 | NEW | 2024-01-01 | 2024-01-01 |
执行批量 upsert 后:
情况1:更新现有订单(orderId=1001)
// 只会更新,不会设置 createdAt
{
"_id": 1,
"orderId": "1001",
"status": "PROCESSING", // 更新
"createdAt": "2024-01-01", // 保持不变(因为 setOnInsert)
"updatedAt": "2024-01-05" // 更新为新时间
}
情况2:插入新订单(orderId=1003)
// 会插入新文档,设置所有字段
{
"_id": 3,
"orderId": "1003",
"status": "NEW",
"createdAt": "2024-01-05", // 设置新时间
"updatedAt": "2024-01-05" // 设置新时间
}
使用场景
这种模式特别适合:
- 数据同步:从外部系统同步订单状态
- 实时更新:更新订单处理进度
- 幂等操作:可以重复执行,不会产生重复数据
- 初始化数据:首次插入时设置创建时间,后续只更新时间
注意事项
- 性能:批量操作比单条操作快很多
- 原子性:每个 upsert 是原子的,但整个批量不是事务
- 索引:确保查询字段有索引,提高性能
- 错误处理:UNORDERED 模式下,部分失败不会影响其他操作