MongoDB批量操作BulkOperations详解

小豆丁 23天前 ⋅ 59 阅读
聊天记录
头像

BulkOperations bulkOps = mongoOperations.bulkOps( BulkOperations.BulkMode.UNORDERED, // 或 ORDERED OrderStatusInfoDocument.class ); 是什么意思?

头像

这行代码是使用 Spring Data MongoDB 进行批量操作的初始化,我来详细解释一下:

含义解析

JAVA
BulkOperations bulkOps = mongoOperations.bulkOps(
    BulkOperations.BulkMode.UNORDERED,  // 批量操作模式
    OrderStatusInfoDocument.class        // 操作的文档类
);

参数详解

1. BulkOperations.BulkMode.UNORDERED

批量操作的模式:

  • UNORDERED(无序批量操作)

    • 所有操作并发执行,不保证执行顺序
    • 遇到失败的操作不会停止,会继续执行其他操作
    • 性能更高,适合不需要顺序保证的场景
  • ORDERED(有序批量操作)

    • 按添加顺序依次执行
    • 遇到失败就停止后续操作
    • 保证操作顺序,但性能较低

2. OrderStatusInfoDocument.class

指定操作的目标集合,这个类通常有 @Document 注解,映射到 MongoDB 的某个集合。

使用示例

JAVA
// 1. 创建批量操作对象
BulkOperations bulkOps = mongoOperations.bulkOps(
    BulkOperations.BulkMode.UNORDERED,
    OrderStatusInfoDocument.class
);

// 2. 添加多个操作
bulkOps.insert(order1);
bulkOps.insert(order2);
bulkOps.updateOne(
    Query.query(Criteria.where("orderId").is("123")),
    Update.update("status", "SHIPPED")
);
bulkOps.remove(Query.query(Criteria.where("status").is("CANCELLED")));

// 3. 执行批量操作
BulkWriteResult result = bulkOps.execute();

实际应用场景

JAVA
// 批量更新订单状态
public void batchUpdateOrderStatus(List<OrderUpdate> updates) {
    BulkOperations bulkOps = mongoOperations.bulkOps(
        BulkOperations.BulkMode.UNORDERED,
        OrderStatusInfoDocument.class
    );
    
    for (OrderUpdate update : updates) {
        bulkOps.updateOne(
            Query.query(Criteria.where("orderId").is(update.getOrderId())),
            Update.update("status", update.getNewStatus())
                      .set("updateTime", new Date())
        );
    }
    
    BulkWriteResult result = bulkOps.execute();
    System.out.println("修改了 " + result.getModifiedCount() + " 条记录");
}

批量操作的优势

  1. 减少网络开销:多个操作一次发送
  2. 提高性能:相比单条操作,性能提升明显
  3. 原子性:在有序模式下提供一定的顺序保证
  4. 简化代码:批量处理逻辑更清晰

注意事项

  • UNORDERED 模式:适合日志记录、数据同步等可重试场景
  • ORDERED 模式:适合有依赖关系的操作序列
  • 单次批量操作有大小限制(默认 1000 个操作,16MB 数据)

这就是 MongoDB 批量操作的基本用法,特别适合需要高效处理大量数据的场景。

头像

Update update = new Update() .setOnInsert("_id", orderStatusInfoDocument.getId()) .setOnInsert("createdAt", Instant.now()) .set("updatedAt", Instant.now()) .set("orderStatus", orderStatusInfoDocument.getOrderStatus()) .set("workerOrderStatus", orderStatusInfoDocument.getWorkerOrderStatus()) .set("workerOrderOperations", orderStatusInfoDocument.getWorkerOrderOperations());

            bulkOps.upsert(query, update);
        });

        // 执行批量操作
        BulkWriteResult result = bulkOps.execute();   是什么?
头像

这是使用 MongoDB 批量更新插入(upsert)操作的代码。让我详细解释:

代码解析

JAVA
// 1. 创建 Update 对象
Update update = new Update()
    .setOnInsert("_id", orderStatusInfoDocument.getId())  // 仅插入时设置
    .setOnInsert("createdAt", Instant.now())             // 仅插入时设置
    .set("updatedAt", Instant.now())                     // 插入和更新都设置
    .set("orderStatus", orderStatusInfoDocument.getOrderStatus())
    .set("workerOrderStatus", orderStatusInfoDocument.getWorkerOrderStatus())
    .set("workerOrderOperations", orderStatusInfoDocument.getWorkerOrderOperations());

// 2. 添加到批量操作(upsert 模式)
bulkOps.upsert(query, update);

Update 方法详解

setOnInsert()

只在插入新文档时设置字段值:

  • 如果文档不存在,执行插入,设置这些字段
  • 如果文档已存在,只更新,不修改这些字段
JAVA
.setOnInsert("createdAt", Instant.now())  // 只在插入时记录创建时间

set()

插入和更新都设置字段值:

JAVA
.set("updatedAt", Instant.now())  // 每次操作都更新时间

Query 对象示例

JAVA
// 通常根据唯一标识查询
Query query = Query.query(
    Criteria.where("orderId").is(orderStatusInfoDocument.getOrderId())
);
// 或
Query query = Query.query(
    Criteria.where("_id").is(orderStatusInfoDocument.getId())
);

完整的批量操作流程

JAVA
// 1. 创建批量操作
BulkOperations bulkOps = mongoOperations.bulkOps(
    BulkOperations.BulkMode.UNORDERED,
    OrderStatusInfoDocument.class
);

// 2. 遍历订单列表,为每个订单创建 upsert 操作
orderList.forEach(order -> {
    // 构建查询条件(根据 orderId 查找)
    Query query = Query.query(
        Criteria.where("orderId").is(order.getOrderId())
    );
    
    // 构建更新操作
    Update update = new Update()
        .setOnInsert("_id", order.getId())           // 只在插入时设置
        .setOnInsert("createdAt", Instant.now())     // 只在插入时设置
        .set("updatedAt", Instant.now())             // 每次都更新
        .set("orderStatus", order.getOrderStatus())  // 更新状态
        .set("lastModified", new Date());            // 更新修改时间
    
    // 添加到批量操作
    bulkOps.upsert(query, update);
});

// 3. 执行批量操作
BulkWriteResult result = bulkOps.execute();

BulkWriteResult 返回结果

执行后返回的结果包含操作统计信息:

JAVA
BulkWriteResult result = bulkOps.execute();

// 获取统计信息
System.out.println("插入数量: " + result.getInsertedCount());      // 新插入的文档数
System.out.println("更新数量: " + result.getModifiedCount());      // 修改的文档数
System.out.println("匹配数量: " + result.getMatchedCount());       // 匹配到的文档数
System.out.println("删除数量: " + result.getDeletedCount());       // 删除的文档数
System.out.println("upsert 数量: " + result.getUpserts().size()); // upsert 操作数

// 获取 upsert 的详细信息
for (BulkWriteUpsert upsert : result.getUpserts()) {
    System.out.println("索引: " + upsert.getIndex());      // 操作在批量中的索引
    System.out.println("ID: " + upsert.getId());          // 插入文档的 _id
}

数据库行为示例

假设数据库中有以下订单:

_id orderId status createdAt updatedAt
1 1001 NEW 2024-01-01 2024-01-01
2 1002 NEW 2024-01-01 2024-01-01

执行批量 upsert 后:

情况1:更新现有订单(orderId=1001)

JAVASCRIPT
// 只会更新,不会设置 createdAt
{
  "_id": 1,
  "orderId": "1001",
  "status": "PROCESSING",  // 更新
  "createdAt": "2024-01-01", // 保持不变(因为 setOnInsert)
  "updatedAt": "2024-01-05"  // 更新为新时间
}

情况2:插入新订单(orderId=1003)

JAVASCRIPT
// 会插入新文档,设置所有字段
{
  "_id": 3,
  "orderId": "1003",
  "status": "NEW",
  "createdAt": "2024-01-05",  // 设置新时间
  "updatedAt": "2024-01-05"   // 设置新时间
}

使用场景

这种模式特别适合:

  1. 数据同步:从外部系统同步订单状态
  2. 实时更新:更新订单处理进度
  3. 幂等操作:可以重复执行,不会产生重复数据
  4. 初始化数据:首次插入时设置创建时间,后续只更新时间

注意事项

  1. 性能:批量操作比单条操作快很多
  2. 原子性:每个 upsert 是原子的,但整个批量不是事务
  3. 索引:确保查询字段有索引,提高性能
  4. 错误处理:UNORDERED 模式下,部分失败不会影响其他操作

全部评论: 0

    我有话说: