Redis实战三:基于Redis实现优惠券秒杀
全局ID生成器
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,具有唯一性、高可用、高性能、递增性、安全性。
使用Redis完成全局自增 , 自增工具类
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP = 1640995200L;
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix ){
//时间挫
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//生成序列号
// 获取当前日期 , 精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//拼接放回
return timestamp << COUNT_BITS | count;
}
}
测试自增
@RunWith(SpringRunner.class)
@SpringBootTest(classes = HmDianPingApplication.class)
public class HmDianPingApplicationTests {
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(500);
@Test
public void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);// 期望300个任务
Runnable task = () -> {
for(int i =0 ; i< 100 ;i++){
long id = redisIdWorker.nextId("order");
System.out.println("Id = " + id);
}
latch.countDown();// 每个任务结束后计数递减
};
long begin = System.currentTimeMillis();
for(int i =0;i<300;i++){
es.submit(task);
}
latch.await(); // 等待所有任务完成
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}
}
需要注意的是,测试函数必须都是Public
秒杀下单
基本秒杀, 只设置事务,保证数据库的一致性
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀未开始");
}
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
if(voucher.getStock() < 1){
return Result.fail("库存不足");
}
boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if(!success){
return Result.fail("库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
Long userID = UserHolder.getUser().getId();
voucherOrder.setUserId(userID);
save(voucherOrder);
return Result.ok(orderId);
}
在测试两百个线程同时下单的时,发现当前秒杀出现问题, 超卖9个,存在并发安全问题。
超卖问题
超卖问题是经典的多线程问题,常见解决方案就是加锁。
悲观锁的方式可以参考实战二的互斥实现。
这里学习乐观锁的方式。
版本号法:引入新的变量,在每次更新的时候判断新的变量版本是否和之前一致。原理是数据库在做Set的时候,多判断一个变量,尽管是多线程,但最后都是对数据库的修改, 所以能在数据库上做可用性判断。
CAS法:和版本号类似,只是将要修改的变量本身作为一个判断的标志。
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock", voucher.getStock())
.update();
超卖问题解决,但出现新的问题,失败率提高了。因为存在多个线程在秒杀的时候,都因为秒杀券变动而放弃。对于秒杀券来说,只要有券都应该认为是可以修改的。
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
对于只能靠是否变化的数据来说,要实施乐观锁,可以分段加锁,对于1批数据分成几个表进行操作,提高成功率。
一人一单
判断是否存在订单
//查询订单,判断是否存在
Long userID = UserHolder.getUser().getId();
int count = query().eq("user_id", userID)
.eq("vouched_id", voucherId)
.count();
if(count > 0){
// 用户已经购买
return Result.fail("用户已经购买");
}
在多线程的情况下,存在程序穿插情况,会照成超卖。由于订单下单是新增操作,不存在修改,所以只能使用悲观锁方式来加锁。
因为要对一人一单进行判断,需要对用户加锁,由于是对事务处理的函数进行加锁,但执行的原函数不存在事务,会出现Spring事务锁失效
在pom.xml添加依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
在启动类上添加注解
@EnableAspectJAutoProxy(exposeProxy = true)
对设计修改数据的操作进行封装,并在业务实现中加锁
@Override
public Result seckillVoucher(Long voucherId) {
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀未开始");
}
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
if(voucher.getStock() < 1){
return Result.fail("库存不足");
}
//查询订单,判断是否存在
Long userID = UserHolder.getUser().getId();
//userID.toString().intern() 能确保值相同的时候锁相同,因为String是引用类型
synchronized (userID.toString().intern()) {
// 事务失效解决方案
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
// return createVoucherOrder(voucherId);?
}
}
发现在执行200个线程之后,当前用户也只下了一单
在单体架构是没问题,现在模拟启动集群查看可用性
复制一份配置并在VM Option新增-Dserver.port=8082添加一个服务
在ngnix下做负载均衡
# proxy_pass http://127.0.0.1:8081;
proxy_pass http://backend;
upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}
重新测试下单,发现在集群模型下,之前的加锁已经出现了问题,同一个用户下了两单。
集群下又出现了并发安全问题,JVM内部的锁失效了,synchronized只能保证一个JVM里面的线程互斥。
分布式锁
集群通常是指中间服务集群,可以做分布式的位置为Redis, Mysql等,基于Redis的分布式互斥操作实际上就是一种分布式锁。
利用Redis存储的互斥能够正常实现锁,保证集群做到一人一单,由于和之前的互斥锁实现一样就不贴代码了。逻辑就是把锁标识+线程ID作为一个字段。
目前任然存在极端问题,如果线程1在阻塞的时候,Redis超时释放锁,线程2获得锁,线程1进行删除锁,线程3又获得锁,存在问题。解决方案就是,在删除锁的时候,判断是否是当前的线程的创建的锁。
@Override
public void unlock() {
//获取线程标识
String id = ID_PREFIX + Thread.currentThread().getId();
//索取Redis所里面的标识
String s = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if( id.equals(s)){
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
这种方法便能解决上诉问题,程序本身的问题解决了,但任然存在由于JVM等阻塞导致锁删除时间过长,锁超时释放,从而造成高并发问题。因为查询锁和释放锁是两个操作,没有原子性。
为了获得原子性,通常通过事务来解决。
Redis可以支持lua语言来实现批量操作
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
在Java中使用Lua脚本,来操作Redis
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
// 执行lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}
总结
在进行优惠券秒杀的时候,为了保证订单的唯一,我们利用Redis构造了全局ID生成器。 为了解决经常的超卖问题多线程问题,我们引入的乐观锁来解决。对于只有新增订单的多线程的高并发问题,我们采用了对用户ID进行加悲观锁来解决。但是到集群多个JVM任然存在问题,我们采用Redis分布式锁来解决。为了对应释放锁中的非原子性问题,我们采用了Lua脚本(一种Redis批处理)实现原子性。