avatar

聚焦Java性能优化 打造亿级流量秒杀系统【学习笔记】06_交易性能优化技术之缓存库存

本章目标

  • 掌握高效交易验证方式
  • 掌握缓存库存模型

7-1 交易性能瓶颈

  • jmeter压测(对活动下单过程进行压测,采用post请求,设置传入参数,性能发现下单avarage大约2s,tps500,交易验证主要完全依赖数据库的操作)
  • 交易验证完全依赖数据库
  • 库存行锁
  • 后置处理逻辑

7-2 交易验证优化

  • 用户风控策略优化:策略缓存模型化

在开始交易后,针对活动实时信息和用户实时信息的验证,目的是为了风控策略,检查用户账号是否异常,是否异地登陆,策略是:通过异步的方式将用户模型写入缓存,与实时信息做一致性检验,做到风控策略

  • 活动校验策略优化:引入活动发布流程,模型缓存化,紧急下线能力

实时活动的缓存存在一个问题:如果后台修改活动信息(修改活动结束时间),但redis的缓存还处于正常有效期,用户依然可以以活动价格秒杀商品,因此需要有紧急下线的能力。对应的策略是:在活动开始前半个小时发布活动,对缓存预热,同时后台设计一个紧急下线的接口,清除redis缓存,那么用户下单时就会去数据库查询活动的最新信息了

Jmeter压测效果:avg:800 吞吐量tps:800左右

7-3 活动缓存库存方案一(重点)

库存行锁优化

首先回顾我们之前减库存的操作:

1
2
3
4
5
<update id="decreaseStock">
update item_stock
set stock = stock - #{amount}
where item_id = #{itemId} and stock >= #{amount}
</update>

库存的数量就是stock-amount 条件是商品itemId和stock的大小大于amount,条件是item_id要加上唯一索引,这样查询的时候为数据库加上行锁,否则是数据库表锁

扣减库存缓存化(方案一)

方案是:我们要将扣减库存的操作发生在缓存而不是数据库中,缓存的扣减时间相对较少

首先要:(1)活动发布同时同步库存进缓存

​ (2)下单交易减缓库存

PromoService 接口中添加活动发布接口

1
2
//活动发布
void publishPromo(Integer promoId);

PromoServiceImpl实现类(默认获取活动id以及商品信息的时候库存不发生变化)

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void publishPromo(Integer promoId) {
//通过活动id获取活动
PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId);
if(promoDO.getItemId()==null || promoDO.getItemId().intValue()==0) {
return;
}
ItemModel itemModel = itemService.getItemById(promoDO.getItemId());

//将库存同步到redis内
redisTemplate.opsForValue().set("promo_item_stock_"+itemModel.getId(),itemModel.getStock());

}

前端写发布活动的controller接口

1
2
3
4
5
6
@RequestMapping(value = "/publishpromo",method = RequestMethod.GET) //浏览时服务端用GET请求
@ResponseBody
public CommonReturnType publishpromo(@RequestParam(name = "id") Integer id) {
promoService.publishPromo(id);
return CommonReturnType.create(null);
}

更新ItemServiceImpl里更新redis减库存的操作

1
2
3
4
5
6
7
8
9
10
//减缓存库存
long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);

if(result > 0 ) {
//更新库存成功
return true;
}else {
//更新库存失败
return false;
}

做到缓存减库存操作,但这样还存在数据库记录不一致的情况

异步同步数据库(方案二)

采用异步消息队列的方式,将异步扣减的消息同步给消息的consumer端,并由消息的consunmer端完成数据库扣减的操作

(1)活动发布同步库存进缓存

(2)下单交易减缓存库存

(3)异步消息扣减数据库内存

异步消息队列rocketmq

采用异步队列可以既能让C端用户完成购买商品的高效体验,又能保证数据库的一致性。

常见的异步消息中间件用到的有ActiveMQ(实现java的AMS)、Kafka(基于流式处理)、RocketMQ是阿里巴巴基于Kafka改造的一种异步消息队列

  • 高性能,高并发,分布式消息中间件
  • 典型应用场景:分布式事务,异步解耦

RocketMQ主要有 Producer端,负责向Broker发送消息;Consumer端,多个consumer组成一个ConsumerGroup,每个消息会有一个Group里的consumer来消费;Broker由topic和MessageQueue组成,消息隶属于某个topic,一个topic可能由一个或多个topic管理

消息队列也是面试官经常会问到的内容,想了解消息队列的原理可以看看这篇新手也能看懂消息队列的文章,写的真心不错,强推!!还有这篇面试会涉及到的文章

库存数据库最终一致性保证

7-5 活动缓存库存方案二

首先NameServer相当于一个注册管理器,Broker1向NameServer发出注册请求,NameServer记录broker1:ip以及它负责的topicA,topicA负责的queue1和queue2

Producer连接NameServer发现broker1,会向topicA为主题的broker1投递消息,采用负载轮询向queue投递;

Consumer抓取负责的topicA,与queue建立长连接,当有消息时,唤醒,拉取对应的message,没有消息就等待,这种方式叫做长轮询

一个consumer对应一个group,会平均划分,如果出现consumer过多,会有空闲。一个项目中会出现多个不同的ConsumerGroup,比如订单系统、商品系统等。若一个queue被多个consumer消费,会存在锁竞争机制,rocketmq采用的策略是以queue为单位平均分配,尽量保证consumer与queue数量相等。

多个Broker会有主从复制机制,用于应对broker1异常,nameserver将broker2设为主库,通知producer以及consumer端去接管,Broker1和Broker2的数据可以是同步也可以是异步的

分布式事务

分布式设计CAP三方面,一致性、可用性、分区容忍性

Soft-state软状态:保证最终结果的一致性,不保证中间过程的一致。比如缓存中的库存和数据库中的库存肯可能会有不一致情况发送

7-7 rocketmq安装

rocketmq 官网下载压缩包 上传到服务器

1
2
3
4
5
6
7
8
9
//解压
unzip rocketmq-all-4.7.1-source-release.zip
//启动Name Server
nohup sh bin/mqnamesrv &
//查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
//启动broker
nohup sh bin/mqbroker -n localhost:9876 &
//发生报错就修改配置文件broker的容量大小重新启动

发送和接受消息

1
2
3
4
5
6
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

7-8 缓存库存接入异步化

新建mq package MqConsumer和MqProducer类

配置appliaction

1
2
3
#设置rocketmq
mq.nameserver.addr=数据库服务器ip地址:9876
mq.topicname=stock

MqProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Component
public class MqProducer {

private DefaultMQProducer producer;

//声明value注解,引入配置变量
@Value("${mq.nameserver.addr}")
private String nameAddr;

@Value("${mq.topicname}")
private String topicName;

@PostConstruct
public void init() throws MQClientException {
//mq producer初始化
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);

producer.start();

}

//同步库存扣减消息
public boolean asyncReduceStock(Integer itemId,Integer amount) {
Map<String,Object> bodyMap = new HashMap<>();
bodyMap.put("itemId",itemId);
bodyMap.put("amount",amount);
//投放消息
Message message = new Message(topicName,"increase",JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
try {
producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
return false;
} catch (RemotingException e) {
e.printStackTrace();
return false;
} catch (MQBrokerException e) {
e.printStackTrace();
return false;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
}

ItemServiceImpl实现缓存减库存以及发送消息减数据库库存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) {
//影响行数
//int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount);

//减缓存库存
long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);

if(result >= 0 ) {
//更新库存成功,发送消息,减数据库库存
boolean mqResult = mqProducer.asyncReduceStock(itemId,amount);
if(!mqResult) {
//发送消息失败,回补库存
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
return false;
}
return true;
}else {
//更新库存失败
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
return false;
}

MqConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Autowired
private ItemStockDOMapper itemStockDOMapper;

@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setNamesrvAddr(nameAddr);
//订阅所有消息
consumer.subscribe(topicName,"*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//实现库存真正到数据库内扣减的逻辑
Message msg = msgs.get(0);
String jsonString = new String(msg.getBody());
Map<String,Object> map = JSON.parseObject(jsonString);
Integer itemId = (Integer) map.get("itemId");
Integer amound = (Integer) map.get("amount");

itemStockDOMapper.decreaseStock(itemId,amound);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

异步同步数据库还是会出现以下几个问题:

  • 异步消息发送失败
  • 扣减操作执行失败
  • 下单失败(用户退单)无法正确回补库存
文章作者: SkironYong
文章链接: https://skironyong.github.io/SkironYong.github.io/posts/e5a4148e.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 SkironYong
打赏
  • 微信
    微信
  • 支付寶
    支付寶

评论