Mongodb 聚合管道模板

Mongodb 的 aggregate 在数据查询、分析等操作中是一个强大的聚合操作,但我发现在尝试编写它时,即使是非常简单的语法,也总是需要参考官方文档或 ChatGPT 或 Perplexity 等 AI 工具,因此我总结了一些常用的模式作为模板,以便在工作中快速参考。

模板

  • 包含匹配、分组和字段输出阶段的基本聚合管道
db.collection.aggregate(
  [
    {
      $match: {}
    },
    {
      $group: {}
    },
    {
      $project: {}
    }
  ]
)
  • 按不同数据类型匹配结果
db.collection.aggregate(
  [
    {
      $match: {
        name: 'andy',                  // string literal
        count: 30,                     // number literal
        is_login: true,                // false, null
        name: {
          $in: ['andy', 'jiang']       // array contain
        },
        is_login: {
          $not: {                      // opposite value
            $in: [false, null]
          }
        },
        count: {
          $gte: 5                      // $lte, $gt, $lt, $eq, $ne, etc
        },
        opened_at: {
          $lt: new Date('2025-02-13')  // $lte, $gt, $lt, $eq, $ne, etc
        },
        $or: [                         // or, must be used at top level
          { x: { $lt: 0 } },
          { x: { $gt: 10 } }
        ],
        $and: [                        // and, must be used at top level
          { x: { $gt: 0 } },
          { x: { $lt: 10 } }
        ]
      }
    },
    {
      $group: {}
    },
    {
      $project: {}
    }
  ]
)
  • _id 分组计算值
db.collection.aggregate(
  [
    {
      $match: {}
    },
    {
      $group: {
        _id: null,      // group by all documents
        _id: '$field',  // group by single field
        _id: {          // group by multiple fields
          field1: '$field1',
          field2: '$field2'
        },
        total: {
          $count: {},   // count doc way 1
          $sum: 1       // count doc way 2
        },
        sum_of_x: {
          $sum: '$x'    // sum of x
        },
        avg_of_x: {
          $avg: '$x'    // avg of x
        }
      }
    },
    {
      $project: {}
    }
  ]
)
  • 输出特定字段
db.collection.aggregate(
  [
    {
      $match: {}
    },
    {
      $group: {}
    },
    {
      $project: {
        _id: 0,           // or false, do not output _id
        field1: 1,        // or true, output field1
        field: '$_id',    // output field used in single-field _id
        x: '$_id.x',      // output _id.x in multiple-field _id
        y: '$arr.0',      // output array element by index
        z: '$obj.z',      // output child object field
        'x.y.z': 1        // output nested field
      }
    }
  ]
)
  • 匹配前使用 window 函数去重
db.collection.aggregate(
  [
    {
      $setWindowFields: {
        partitionBy: {
          field1: '$field1',
          field2: '$field2'
        },
        sortBy: {
          _id: 1
        },
        output: {
          rn: {
            $documentNumber: {}
          }
        }
      }
    },
    {
      $match: {
        rn: 1   // only keep records with row number = 1
      }
    },
    {
      $group: {}
    },
    {
      $project: {}
    }
  ]
)
  • 使用 $dateTrunc 转换时间戳
db.collection.aggregate(
  [
    {
      $match: {}
    },
    {
      $group: { 
        _id: {
          field1: '$field1',
          date_field: {
            $dateTrunc: {
              date: '$timestamp',
              unit: 'day', // day, week, month
              timezone: 'Asia/Tokyo',
              startOfWeek: 'Monday'
            }
          }
        },
        field2: {
          $sum: '$field2'
        }
      }
    },
    {
      $project: {}
    }
  ]
)
  • 在按条件过滤后的结果中查找目标
db.collection.aggregate(
  [
    { $match: { _id: { $gt: ObjectId('xxx') } } },
    { $sort: { _id: 1 } },
    { $limit: 1_000_000 },
    { $sort: { _id: -1 } },
    { $limit: 1 },
    { $project: { _id: 1 } }
  ]
)

参考资料

mongodb