利用MongoDB的聚合实现查询的复杂需求

概述

在公司某项目中,对于商品和库存的查询有各种各样的需求,RESTful请求也存在多个,但其核心都是基于商品和库存两个MongoDB的Document进行查询。举例如下

查询商品:queryCommodity
public static final int OP_CONSUMER_QUERY_COMMODITY;

查询库存:quStock
public static final int OP_CONSUMER_QUERY_STOCK;

查询货架:quShelves
public static final int OP_CONSUMER_QUERY_COMMODITY_SHELVES;

自此基础上,还需要有排序,筛选,分页等常规需求,以及一些业务需求(例如客户可以设置无库存的商品不显示)。

为了满足以上需求,并最大化的利用数据库性能,最大化的避免取出数据后做内存中的排序,元素取舍等工作和假分页,使用MongoDB功能强大的聚合(Aggregation)操作进行处理。由于这个功能颇具代表性,此处描述使用方法和难点。

数据结构

Commodity和Stock表分别代表商品和该商品的库存,后者以commodityCode作为外键。目前假设这两个结构为1对1关系,否则有一处处理会有小问题,下文详述。

Commodity的一个文档结构如下(结构仅为示意,下同):

{
  "_id": "3d5195798d2-96d5-144b50546611",
  "commodityCode": "3d519545-798c-4dd2-96d5-144b50546611",
  "goodName": "得宝抽取式面巾纸 (天然无香)", 
  "comment": "浪里个浪",
  "priceMap": {
    "default": {
      "commodityCode": "3d519545-74d-96d5-144b50546611",
      "largeSale": 2,
      "middleSale": 0,
      "smallSale": 0,
      "customerGroup": "default"
    }
  },
  "offShelves": 0,
  "storehouseIds": [
    "6c24e5cd-9790-a118-314489b50578"
  ],
  "largeCode": "69479910710",
  "createTime": 1532771745,
}

Stock的一个文档结构如下:

{
  "_id": "3d519545-798c-4d6d5-144b50546611",
  "commodityCode": "3d519545-798c-4d5-144b50546611",
  "goodName": "得宝抽取式面巾纸 (天然无香)",
  "goodPhase": "90抽",
  "stores": [
    {
      "storehouseId": "6c24e5cd-979a-44a0-a118-314489b50578",
      "largeCount": 371,
      "middleCount": 0,
      "smallCount": 0
    },
    {
      "storehouseId": "b16a6a5a-9e70-403a-b895-2b510e221f5d",
      "largeCount": 1,
      "middleCount": 0,
      "smallCount": 0
    }
  ],
  "version": 0,
}

需求汇总

  1. 数据筛选,需要支持筛选商品关键字,所在仓库,商品类别,上下架状态,是否存在价格等;
  2. 依据配置显示或不显示缺货商品(即大中小库存均小于0的商品)
  3. 排序,支持按仓库或不按仓库的排序
  4. 基于排序的分页

Aggregation Pipeline的设计和实现

MongoDB提供了性能非常强大的聚合功能,且这种功能可以使用pipeline来实现。后者的好处是可以在数据操作中做性能的优化,例如先筛选数据,再在筛选后的结果集上做group,count等聚合操作。

综合以上需求,Pipeline的设计如下

  1. 根据参数筛选商品记录(match),第一步要根据商品的属性进行筛选,以最大限度的去�掉结果中不应该包含的数据;
  2. 聚合库存(Stock)对象,为了获取库存信息,利用lookup可以把Stock聚合到Commodity对象中。因为是外键关联,所以value是一个数组;
  3. 用unwind获取第一个stock,正因为是数组,所以在做下一步操作前,先通过unwind操作“打散”这个数组,并取第一条记录。这也是为什么认为两个对象是一对一的关系,否则的话,此处会有信息丢失;
  4. 根据参数筛选库存记录(match),在以上结果中包含了stock信息,所以可以再做一次过滤。此处要注意key需要加前缀,因为不是在stock对象中做match了。此处还有一个变化就是不显示缺货商品,如果需要的话,要在match中考察整体库存情况;
  5. 如果需要分页的话,提前计算符合条件的总条数。因为在排序环节有可能再次打散stock对象中的stores结构,这样会导致count结果不准确,所以在这步提前考虑。另外,由于这里做了一次实际的聚合,当结果为0时,下面的操作就可以都不执行了,以提高性能;
  6. 排序,常规的使用整体库存进行排序,较复杂的情况是利用unwind打散仓库,并基于特定仓库进行排序,此处有个返回值的对象问题,下一节详述。如果没给排序需求,必须有个缺省的商品名称序,这是因为如果乱序,那么分页的话可能会造成错误,即使不分页,按名称序列输出也比较合理;
  7. 分页,置入每页的数量和跨过的行数。

聚合代码如下

// Step 1: Match commodity criteria if necessary.
if (commodityCriteria != null) {
    aggregationOperationList.add(Aggregation.match(commodityCriteria));
}
// Step 2: Join stock by lookup operation.
aggregationOperationList.add(Aggregation.lookup("stock", "commodityCode", "commodityCode", "stock"));
// Step 3: Expand stock array by unwind operation.
// 原因是lookup之后总是以数组形式出现的,此处假设commodity和stock为一对一关系
aggregationOperationList.add(Aggregation.unwind("stock"));
// Step 4: Math stock criteria if necessary
if (stockCriteria != null) {
    // Step 4.1: Ignore commodity without stock according to setting.
    if (!ignoreLackPolicy && CommodityLackPolicy.NO_SHOW == lackPolicy) {
        // 只取stock中大中小规格任意一个有货(库存大于0,且不分仓库)的商品
        Criteria orCriteria = new Criteria();
        orCriteria.orOperator(Criteria.where("stock.largeAmount").gt(0),
                Criteria.where("stock.middleAmount").gt(0),
                Criteria.where("stock.smallAmount").gt(0));
        stockCriteria.andOperator(orCriteria);
    }
    aggregationOperationList.add(Aggregation.match(stockCriteria));
}
// Step 5: Count total if needing pageable.
boolean noResult = false;
if (pageMongoCond != null) {
    // 如果需要分页,则必须在stock.stores做unwind操作前统计总量
    aggregationOperationList.add(Aggregation.count().as("resultSize"));
    List<Map> resultSize = mongoTemplate.aggregate(Aggregation.newAggregation(aggregationOperationList), "commodity", Map.class).getMappedResults();
    if (resultSize.size() > 0) {
        total = (Integer) resultSize.get(0).get("resultSize");
        // 从管道中移除count,以获取实际数据
        aggregationOperationList.remove(aggregationOperationList.size() - 1);
    } else {
        noResult = true;
    }
}

if (noResult) {
    // 上一步如果已经确定了查询结果,且为0,则不需要继续做聚合
    fullResult = new ArrayList(0);
} else {
    // Step 6: Specific sort if necessary, or mandatory sort by goodName
    Sort sort = null;
    if (direction == -1 || direction == 1) {
        // 仅接受-1和1的选项
        if (stockPropCond != null && ArrayUtils.isNotEmpty(stockPropCond.getStorehouseIds())) {
            // Step 6.1: Expand stores array by unwind operation in stock segment.
            aggregationOperationList.add(Aggregation.unwind("stock.stores"));
            // 由于打散操作,返回到内存对象时要和普通的ComposeCommodity有所区别
            useUnwindStock = true;
            // 注意此处一定要match掉所有非当前仓库的数据,否则limit会出现错误
            aggregationOperationList.add(Aggregation.match(Criteria.where("stock.stores.storehouseId").is(stockPropCond.getStorehouseIds()[0])));
            // 基于当前选择的仓库的大规格库存做排序
            sort = new Sort(direction > 0 ? Sort.Direction.ASC : Direction.DESC, "stock.stores.largeCount");
        } else {
            // 基于当前商品的大规格库存做排序
            sort = new Sort(direction > 0 ? Sort.Direction.ASC : Direction.DESC, "stock.largeAmount");
        }
        sort = sort.and(new Sort(Sort.Direction.ASC, "goodName"));
    } else {
        // 如果没有排序需求,则默认给一个商品名称序,否则如果用分页的话可能会造成错误,即使不分页,按名称序列输出也比较合理
        sort = new Sort(Direction.ASC, "goodName");
    }
    aggregationOperationList.add(Aggregation.sort(sort));
    // Step 7: Pageable for result if necessary.
    if (pageMongoCond != null) {
        aggregationOperationList.add(Aggregation.skip(new Integer(pageMongoCond.getSkip()).longValue()));
        aggregationOperationList.add(Aggregation.limit(pageMongoCond.getLimit()));
    }
    // Step 99: Join storehouse by lookup operation.
    aggregationOperationList.add(Aggregation.lookup("storehouse", "storehouseIds", "_id", "storehouses"));

    if (useUnwindStock) {
        fullResult = mongoTemplate.aggregate(Aggregation.newAggregation(aggregationOperationList), "commodity", CommodityWithUnwindStock.class).getMappedResults();
    } else {
        fullResult = mongoTemplate.aggregate(Aggregation.newAggregation(aggregationOperationList), "commodity", CommodityWithNormalStock.class).getMappedResults();
    }
}

以上代码就是根据需求汇总的要求,先缩小目标范围,再根据要求聚合数据,最后进行排序和分页后返回给前端需要的数据。不同RESTful请求间的区别仅在该核心算法的调用参数上,且对于基于2个对象的属性类需求变更,易于扩展,减少了后期维护工作量。

MongoDB层面的查询及返回结果

根据以上设计和实现,首先Spring Data Mongo会转换成如下的mongo命令进行查询

db.commodity.aggregate([
  {
    "$match": {
      "merchantId": "28"
    }
  },
  {
    "$lookup": {
      "from": "stock",
      "localField": "commodityCode",
      "foreignField": "commodityCode",
      "as": "theStock"
    }
  },
  {
    "$unwind": "$theStock"
  },
  {
    "$match": {
      "theStock.merchantId": "28",
      "stock.stores.storehouseId": {
        "$in": [
          "8924dee5-c9ae-42d1-a14c3709a44f"
        ]
      }
    }
  },
  {
    "$unwind": "$theStock.stores"
  },
  {
    "$match": {
      "stock.stores.storehouseId": "8924dee5-c9ae-449f-b4c3709a44f"
    }
  },
  {
    "$sort": {
      "theStock.stores.largeCount": -1,
      "goodName": 1
    }
  },
  {
    "$skip": 0
  },
  {
    "$limit": 3
  },
  {
    "$lookup": {
      "from": "storehouse",
      "localField": "storehouseIds",
      "foreignField": "_id",
      "as": "storehouses"
    }
  }
]).pretty()

其返还结果是符合条件的数据的list,例子如下(单个记录,商品数据有删减)

{
  "_id": "f981b072-593d-4f52-80ed-ce71967c7805",
  "commodityCode": "f981b072-593-80ed-ce71967c7805",
  "photoResId": "[f$e5787095-0f47-4f9f-b19c-1f620b120e43|undefined]",
  "simpleQueryCode": "3229",
  "fullQueryCode": "332262464",
  "comment": "",
  "priceMap": {
    "default": {
      "commodityCode": "f981b072-593d-4f52d-ce71967c7805",
      "largeSale": 0,
      "middleSale": 0,
      "smallSale": 0,
      "customerGroup": "default"
    }
  },
  "offShelves": 0,
  "hasDefaultPrice": 0,
  "categoryIdentity": "000001004",
  "storehouseIds": [
    "8924dee5-c9ae-449f-b2d1-a14c3709a44f"
  ],
  "largeCode": "2018100805",
  "middleCode": "0",
  "smallCode": "0",
  "createTime": 0,
  "stock": {
    "_id": "f981b072-593d-4f52-8-ce71967c7805",
    "commodityCode": "f981b072-593d-4f80e71967c7805",
    "goodName": "得宝抽纸",
    "goodPhase": "4层100抽",
    "stores": {
      "storehouseId": "8924dee5-c-449f-b2d1-a14c3709a44f",
      "largeCount": 56,
      "middleCount": 0,
      "smallCount": 0
    },
    "version": 0,
  },
  "storehouses": [
    {
      "_id": "8924dee5-c9ae-4-b2d1-a14c3709a44f",
      "name": "仓库1",
      "code": "001",
      "isDefault": 1,
      "createTime": "1538964559185",
      "updateTime": "1538964559185",
      "status": 0,
    }
  ]
}

其中key为stockstorehouses的值均为外键关联后的数据结构。注意到stock.stores的结构与单独的stock对象中的不一样了

"stores": {
    "storehouseId": "8924dee5-c9ae-449f--a14c3709a44f",
    "largeCount": 56,
    "middleCount": 0,
    "smallCount": 0
}

这就是因为unwind操作基于stores,“打散”了stock数据。

Mongo结果输出为Java对象的问题详解

由于有这个操作的存在,从mongo原生数据到Java对象,产生了一些问题需要处理。

其前提是Spring Data Mongo是非常强大的客户端包,其会对当前返回的原生数据包裹为对应的Java对象。但如果原生数据对应的Java对象产生属性类型不一致的情况下,它不会自动转成普通的Map<String,Object>进行输出,而是直接抛出异常。

所以在上述unwind操作后,如果不做特殊处理的话,stock这个key在转为Stock对象时,会抛出异常,根本原因是其中的List对象无法通过反射机制构造,毕竟unwind后没有数组了。

基于这个原因,我们在做聚合操作后,返回的outType类型必须分情况讨论

if (useUnwindStock) {
        fullResult = mongoTemplate.aggregate(Aggregation.newAggregation(aggregationOperationList), "commodity", CommodityWithUnwindStock.class).getMappedResults();
    } else {
        fullResult = mongoTemplate.aggregate(Aggregation.newAggregation(aggregationOperationList), "commodity", CommodityWithNormalStock.class).getMappedResults();
    }

CommodityWithUnwindStock类定义如下

public class CommodityWithUnwindStock extends ComposeCommodity {
    /**
     * 此处必须使用和Mongo中一致的变量名,Spring-mongo会根据这个一致的名称尝试做类型转换,否则该字段会为null
     */
    private UnwindStock stock;

    public UnwindStock getStock() {
        return stock;
    }

    public void setStock(UnwindStock stock) {
        this.stock = stock;
    }
}

CommodityWithNormalStock类定义如下

public class CommodityWithNormalStock extends ComposeCommodity {
    private Stock stock;

    public Stock getStock() {
        return stock;
    }

    public void setStock(Stock stock) {
        this.stock = stock;
    }
}

后者相对简单,前者是在Commodity中包含了一个新的UnwindStock对象,其stores属性不是List,而是StockStore对象,这就规避了通过反射创建对象时的那个异常。如代码注释所说,这个UnwindStock对象必须名为stock,这样才能映射到原生数据中key为stock的值。当然,这样做的代价就是要在返回结果前,把UnwindStock,转换为Stock(核心是把stores对象再转换回一个元素的List)

结论

MongoDB的聚合操作功能十分强大,另外结合pipeline的工作机制,可能带来比关系型数据库更好的性能。原因是pipeline可以最大限度的优化处理数据时的耗费。

Spring Data Mongo在做mongodb操作时也是非常强大的,但也有些不太完善的地方,对unwind支持不好既是一例。注意到对于“打散”操作,其通过反射机制映射回Java对象时考虑的不完全,且客户自定义操作较繁琐(至少我还没有找到很简便的方式)。但总体来说是降低了对MongoDB聚合操作的使用难度,本文给出了这里应该注意的问题和解决方案。

点赞