幂等性

目录

幂等性

1.什么是幂等性?
幂等性:提交一次和多次,结果是一样的。

2.哪些情况需要保证幂等性?
用户多次点击按钮;
用户页面回退再次提交;
微服务相互调用,由于网络原因,导致请求失败,feign触发重试机制;
其他业务情况;

3.什么情况下需要幂等性?
以SQl为例,有些操作时天然幂等。
select * from table where id =? 无论查询多少次都不会改变状态,是天然幂等的。
update table set col=1 where col2=2,无论执行操作多少次都不会改变状态,是天然幂等的。
delete from user where userId=1,多次操作,结果一样幂等操作。
inter into user(id,name)values(1,‘a’),如id唯一主键,即重复操作上面的业务,只会插入一条数据,具备幂等性;

update table1 set col1=col1+1 where col2=2,每次执行的结构都会发生变化,不是幂等。
inter into user(id,name)values(1,‘a’),如id不是主键,即重复操作上面的业务,会插入多条数据,不具备幂等性;

4.幂等解决方案?

4.1 token机制
    1.服务端提供了发生token的接口。我们在分析业务的时候,哪些业务是存在幂等问题的,就必须在执行业务前,先去
    获取token,服务器会把token保存到redis中。
    2.然后在调用业务接口请求时,把token携带过去,一般放在请求头部。
    3.服务器判断token是否存在于redis中,存在表示第一次请求,然后删除token,继续执行业务
    4.如果判断token不存在redis中,就表示是重复操作,直接返回重复标记给客户端,这样就保证了业务代码,不被重复执行

    危险性:
    1、先删除token还是后删除token
        如果是后删除令牌,用户点提交订单,连续点两次,第一次提交订单,还没执行完,还没来得及删除令牌,
        第二次的提交订单就进来,这样就出现数据不一致的问题了。

        如果是先删除令牌,一般分布式下,token存redis里,假设用户还是点的很快,连续点了两次提价订单,
        两个请求同时去redis里获取token,同时对比成功,同时删令牌,同时执行业务逻辑,这就会出现一定的
        风险

        因此,如果是先删除令牌,获取redis里的令牌操作,和前端带来的token比较的操作以及删除令牌的操作,
        即:获取,对比,删除 这三个操作必须是原子的,给上个分布式锁
4.2 各种锁机制
     1、数据库悲观锁
        select * from XXX where id=1 for update;
        悲观锁使用时伴随着事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。另外要注意的是,id
        字段一定是主键或者唯一索引,不然可能造成锁表的结果,处理起来非常麻烦。
     2、数据库乐观锁
        这种方式适合在更新的场景中。
     3、业务层分布式锁
        如果多个机器可能在同一时间同时处理相同的数据,比如多台机器定时任务都拿到了相同数据处理,我们就可以
        加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断这个数据是否被处理过。
4.3 各种唯一约束
      1、数据库唯一约束
        插入数据,应该按照唯一索引进行插入,比如:订单号,相同的订单号就不能有两条记录插入。
        我们在数据库层面防止重复。
        这个机制是利用了数据库的主键唯一约束的特性,解决了在insert场景时幂等问题。
        但主键的要求不是自增的主键,这样就需要业务生成全局唯一的主键。
        如果是分库分表场景下,路由规则要保证相同请求下,落在同一个数据库和同一表中,要不然数据库约束就不起
        效果了,因为是不同的数据库和表主键不想关
       2、redis set防重
       很多数据需要处理,只能被处理一次,比如我们可以计算数据的MD5将其放入redis的set,每次处理数据,先看
       md5是否存在,存在就不处理。
4.4 防重表
       使用订单号orderNo做为去重表的唯一索引,把唯一索引插入去重表,再进行业务操作,且他们在同一个事务中,
       这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题。注意,去重表和业务表应该在
       同一数据库中,这样就保证了在同一个事务,即使业务操作失败,也会把去重表的数据回滚。这个很好的保证了数
       据一致性。
4.5 全局请求唯一id
        调用接口时,生成一个唯一id,redis将数据保存到集合中(去重),存在即处理过,可以使用nginx设置每一个
        请求的唯一id proxy_set_header X-Request-Id $request_id;

下面展示几种用过的方案

一、token机制解决幂等性

场景:订单确认页,可以点击提交订单按钮,来提交订单,用户可能连续点击提交订单按钮,这个业务可能会出现幂等性问题。

确认订单接口的服务:
使用uuid生成一个防重令牌token,并把token放到redis里,然后把这个token,封装到出参,给到前端的订单确定页面

    @Autowired
    private RedisTemplate redisTemplate;
    /**
     * 确认订单
     * @return
     */
    @Override
    public OrderConfirmVo confirmOrder() {
        //1.异步任务1 - 远程查询所有的收货地址列表
		xxx
        //2.异步任务2 - 远程查询购物车所有选中的购物项
		xxxx

        //3.查询用户积分
		xxx
       
        //4.其他数据自动计算

        //5.防重令牌
        String token = UUID.randomUUID().toString().replace("-", " ");
        redisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX+memberResponseVo.getId(),token,30, TimeUnit.MINUTES);
        orderConfirmVo.setOrderToken(token);

        return orderConfirmVo;
    }

提交订单服务:

    @Autowired
    private RedisTemplate redisTemplate;
    @Transactional
    @Override
    public SubmitOrderResponseVo submitOrder(OrderSubmitVo orderSubmitVo) {
        //1、验令牌【令牌:获取,对比,删除。必须保证原子性】
        //前端传的token
        String token = orderSubmitVo.getOrderToken();
        //0令牌验证失败  1验证成功
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        //lua脚本保证原子性:获取令牌,验证令牌和删除令牌
        Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
                Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()),
                token);
        //验证失败
        if (result==0){
            submitOrderResponseVo.setCode(1);
            return submitOrderResponseVo;
        }
        //验证成功
        else {
            //下单
            xxx

            //验价
			xxx

            //保存订单
           xxx

            //库存锁定。只要有异常回滚订单数据
            //需要:订单号,所有订单项(sku_id,sku_name),锁几件
            //远程锁库存
            xxx
        }
    }

二、防重表解决幂等性

场景:下订单出现异常,需要回退远程服务的库存。假设使用的是可靠消息最终一致性方案,如果回退库存的业务执行成功了,但是在返回给mqack,因为网络问题没发给消息队列,消息队列认为消息没消费成功,再次给库存服务发消息,去再次回退库存,这就出现数据不一致的问题了。

为了保证消息处理的幂等性。处理过的消息会存到数据库中去(trade_mq_consumer_log防重表),消费者接受一条消息,不是拿来立马就消费,而是拿来判断一下有没有处理过,如果已经处理过,就不用了处理了,因为如果提供者给你发重复的消息,你进行重复消息,存库连续回退,数值肯定对不上。因此,消息的幂等性,需要在消费端进行保证的。
具体做法:通过存储消息的表的字段,组名,tag,key以及消费状态(处理成功表示消费了),判断当前消息是否消费过。

防重表示例:

CREATE TABLE `trade_mq_consumer_log` (
  `msg_id` varchar(50) DEFAULT NULL COMMENT '消息id',
  `group_name` varchar(100) NOT NULL COMMENT '消费者组名',
  `msg_tag` varchar(100) NOT NULL COMMENT 'tag',
  `msg_key` varchar(100) NOT NULL COMMENT 'key',
  `msg_body` varchar(500) DEFAULT NULL COMMENT '消息体 json格式',
  `consumer_status` int(1) DEFAULT NULL COMMENT '0:正在处理;1:处理成功;2:处理失败',
  `consumer_times` int(1) DEFAULT NULL COMMENT '消费次数',
  `consumer_timestamp` timestamp NULL DEFAULT NULL COMMENT '消费时间',
  `remark` varchar(500) DEFAULT NULL COMMENT '备注',
  PRIMARY KEY (`group_name`,`msg_tag`,`msg_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

例如:

@RocketMQMessageListener(topic = "${mq.order.topic}",
        consumerGroup ="${mq.order.consumer.group.name}",
        messageModel = MessageModel.BROADCASTING  //广播模式
)
@Component
@Slf4j
public class CancelMQListener implements RocketMQListener<MessageExt> {
	。。。
    @Override
    public void onMessage(MessageExt message) {
        String msgId=null;
        String tags=null;
        String keys=null;
        String body=null;
        try {
            //1.解析消息内容
            msgId = message.getMsgId();
            tags = message.getTags();
            keys = message.getKeys();
            body = new String(message.getBody(), "UTF-8");
            log.info("接受消息成功");

            //2.查询消息消费记录
            TradeMqConsumerLogKey tradeMqConsumerLogExample=new TradeMqConsumerLogKey();
            tradeMqConsumerLogExample.setGroupName(groupName);
            tradeMqConsumerLogExample.setMsgTag(tags);
            tradeMqConsumerLogExample.setMsgKey(keys);
            TradeMqConsumerLog tradeMqConsumerLog = tradeMqConsumerLogMapper.selectByPrimaryKey(tradeMqConsumerLogExample);
            //3.判断 如果消费过
            if (tradeMqConsumerLog!=null) {
                //获取消息的处理状态 0:正在处理;1:处理成功;2:处理失败
                Integer consumerStatus = tradeMqConsumerLog.getConsumerStatus();
                //处理过
                if (ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue() == consumerStatus.intValue()) {
                    log.info("消息:" + msgId + "处理过");
                    return;
                }
                //正在处理
                if (ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue() == consumerStatus.intValue()) {
                    log.info("消息:" + msgId + "正在处理");
                    return;
                }
                //处理失败
                if (ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue() == consumerStatus.intValue()) {
                    //消费次数已经大于3次
                    if (tradeMqConsumerLog.getConsumerTimes()>3){
                        log.info("消息:" + msgId + ",处理超过三次,不能再进行处理了");
                        return;
                    }
                    //消费次数小于三次
                    //状态设置为正在处理
                    tradeMqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                    //updateByExampleSelective方法默认支持数据库乐观锁
                    //设置并发修改条件
                    TradeMqConsumerLogExample tradeMqConsumerLogExample1=new TradeMqConsumerLogExample();
                    TradeMqConsumerLogExample.Criteria criteria=tradeMqConsumerLogExample1.createCriteria();
                    criteria.andMsgTagEqualTo(tradeMqConsumerLog.getMsgTag());
                    criteria.andMsgKeyEqualTo(tradeMqConsumerLog.getMsgKey());
                    criteria.andGroupNameEqualTo(groupName);
                    //消费次数相当于是乐观锁的version,只不过这里的version人为规定不能大于3
                    criteria.andConsumerTimesEqualTo(tradeMqConsumerLog.getConsumerTimes());
                    int i = tradeMqConsumerLogMapper.updateByExampleSelective(tradeMqConsumerLog, tradeMqConsumerLogExample1);
                    if (i<=0){
                        //未修改成功,其他线程并发修改
                        log.info("并发修改,稍后处理");
                    }
                }
            }
            //4.判断 如果没有消费过
            else {
                tradeMqConsumerLog=new TradeMqConsumerLog();
                tradeMqConsumerLog.setMsgTag(tags);
                tradeMqConsumerLog.setMsgKey(keys);
                tradeMqConsumerLog.setGroupName(groupName);
                //消息状态改为正在处理
                tradeMqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                tradeMqConsumerLog.setMsgBody(body);
                //消费次数,指的是消费失败时才记录
                tradeMqConsumerLog.setConsumerTimes(0);
                tradeMqConsumerLog.setMsgId(msgId);

                //将消息处理信息添加到数据库
                tradeMqConsumerLogMapper.insert(tradeMqConsumerLog);
            }
            //5.回退库存
			。。。。

            //6.记录消息消费记录,更改消息处理状态为成功处理
            tradeMqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
            tradeMqConsumerLog.setConsumerTimestamp(new Date());
            tradeMqConsumerLogMapper.updateByPrimaryKey(tradeMqConsumerLog);
            log.info("回退库存成功");
        } catch (Exception e) {
            e.printStackTrace();
            //如果消息处理失败,需要记录
            //先查询消息消费记录表有没有数据
            TradeMqConsumerLogKey tradeMqConsumerLogKey=new TradeMqConsumerLogKey();
            tradeMqConsumerLogKey.setGroupName(groupName);
            tradeMqConsumerLogKey.setMsgTag(tags);
            tradeMqConsumerLogKey.setMsgKey(keys);
            TradeMqConsumerLog tradeMqConsumerLog = tradeMqConsumerLogMapper.selectByPrimaryKey(tradeMqConsumerLogKey);
            //没有,就将当前消息消费失败的信息,插进去
            if (tradeMqConsumerLog==null){
                tradeMqConsumerLog=new TradeMqConsumerLog();
                tradeMqConsumerLog.setMsgTag(tags);
                tradeMqConsumerLog.setMsgKey(keys);
                tradeMqConsumerLog.setGroupName(groupName);
                //消息状态改为正在处理
                tradeMqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                tradeMqConsumerLog.setMsgBody(body);
                //消费次数,指的是消费失败时才记录
                tradeMqConsumerLog.setConsumerTimes(1);
                tradeMqConsumerLog.setMsgId(msgId);
                tradeMqConsumerLogMapper.insert(tradeMqConsumerLog);
            }
            //如果已经有消息消费记录,更新消费次数
            else {
                tradeMqConsumerLog.setConsumerTimes(tradeMqConsumerLog.getConsumerTimes()+1);
                tradeMqConsumerLogMapper.updateByPrimaryKeySelective(tradeMqConsumerLog);
            }
        }
    }
}

三、业务层分布式锁

场景:
多台机器定时任务都拿到了相同数据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断这个数据是否被处理过。

例如:每天凌晨3点定时上架最近三天参与秒杀的商品,三台机器A1,A2,A3,都有一个定时任务,同一段程序,定时任务的设置都一样,等时间一到,它们就都同时启动了定时任务,就要同时执行业务代码,同时去上架秒杀商品,就会问题
《幂等性》
解决:
应该是定时任务,只能有一台机器在执行,不能所有机器同时执行。
使用分布式锁解决。
《幂等性》
使用分布式锁解决了定时任务在分布式下的问题:

@Slf4j
@Service
public class seckillSkuScheduled { 
    @Autowired
    private SeckillService seckillService;
    @Autowired
    private RedissonClient redissionClient;

    private final String upload_lock="seckill:upload:lock";

    //保证幂等性问题
    @Scheduled(cron = "*/5 * * * * ?")
    public void uploadSeckillSkuLatest3Days() { 
        log.info("上架秒杀的商品");

        //分布式锁
        RLock lock = redissionClient.getLock(upload_lock);
        //锁的超时时间
        lock.lock(10, TimeUnit.SECONDS);
        try{ 
            seckillService.uploadSeckillSkuLatest3Days();
        }finally { 
            //释放锁
            lock.unlock();
        }
    }
}

还有就是要通过业务层代码来控制幂等性:
比如:每晚都要缓存最近三天参与秒杀的商品,那缓存时就要注意已经存在的商品,不能再重复缓存
《幂等性》

点赞