09.饿了么一面
# 项目
# 1、项目介绍,用了哪些技术?
这个根据自己项目进行介绍即可,一般面试都会先根据项目来问,所以项目这一块的内容是非常重要的,要在项目上多下功夫,让面试官看到自己对项目的付出、积累
# 八股
# 2、Spring 和 SpringBoot 有啥区别?
这个可以根据我们平常使用两个框架的经验来讲就好,不是非要去背出来两者在各个方面的区别
从 使用方面 来说的话:
Spring 的话,主要提供了 AOP 和 IOC 两个核心功能,像一些中间件框架都是基于 Spirng 来编写的,可以通过 Spring 来统一管理项目中的所有 Bean,这样就不需要我们自己去创建了,比较方便
但是如果要基于 Spring 来做一个 Web 项目的话,我们还要自己去集成 MyBatis、Tomcat 这些东西,比较麻烦,因此 SpringBoot 就在内部为我们集成好了这些组件,可以 开箱即用
从 应用方面 来说的话:
如果我们自己想要做一个中间件,那么基于 Spring 来做是比较方便的,可以通过 Spring 来管理所有的 Bean;如果我们想要做一个 Web 项目,那么肯定是要基于 SpringBoot 来做的
# 3、怎么自定义一个 SpringBoot 的 starter?
这里先说一些 SpringBoot starter 是什么,这个 starter 就相当于是一个工具箱,封装一些比较通用的工具,比如果限流,基本上在所有项目中都比较常用一些,因此可以将 限流功能 封装成为一个 starter,以后在使用的时候,可以通过引入 starter 直接使用限流的功能,不需要再重复编写一套限流逻辑,这就是 starter 的作用
那么实现 starter 是基于 Spring 来做的
# 定义 starter 流程
这里以限流组件为例,自定义 starter 的话,主要包括以下几个步骤:
1、定义自定义注解 @DoRateLimiter
将自定义的注解标注在需要限流的方法上,让 Spring 可以知道需要对哪些方法进行限流
2、定义限流服务
我们将限流服务抽取出来,这样在切面中就可以直接调用我们的限流服务了
public class RateLimiterOpServiceImpl implements IRateLimiterOpService{
@Override
public Object access(ProceedingJoinPoint jp, Method method, DoRateLimiter doRateLimiter, Object[] args) throws Throwable {
// ... 执行限流,可以通过 Google Guava 的 RateLimiter 来实现限流
}
}
3、定义 AOP 切面
在切面中调用限流服务,对我们添加自定义注解的方法进行增强,即对添加注解的接口进行限流
@Component
@Aspect
public class DoRateLimiterPoint {
// 该切面匹配了所有带有 @DoRateLimiter 注解的方法
@Pointcut("@annotation(com.zqy.ratelimiter.annotation.DoRateLimiter)")
public void aopPoint() {}
// aopPoint() && @annotation(doRateLimiter) 这样处理,可以通过方法入参就直接拿到注解,比较方便
@Around("aopPoint() && @annotation(doRateLimiter)")
public Object doRouter(ProceedingJoinPoint jp, DoRateLimiter doRateLimiter) throws Throwable {
// ... 调用上边定义的 RateLimiterOpServiceImpl 限流服务进行接口限流
}
}
4、定义自动配置类
定义好切面之后,需要定义 自动配置类 将切面加入到 Spring 中去,但是如果其他项目集成我们当前 starter 并不知道需要去扫描哪个自动配置类,因此还要在 /META-INF/spring.factories 文件中中指定自动配置类的路径,指定后 SpringBoot 才可以扫描到该 starter 中的这个自动配置类
// 自动配置类
@Configuration
public class RateLimiterAutoConfig {
@Bean
public DoRateLimiterPoint doRateLimiterPoint() {
System.out.println("创建了切面");
return new DoRateLimiterPoint();
}
}
// 创建 META-INF/spring.factories 文件,并且指定自动配置类的路径
// 只需要修改后边的 com.zqy.ratelimiter.config.RateLimiterAutoConfig 这个路径即可
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.zqy.ratelimiter.config.RateLimiterAutoConfig
5、使用 starter
如上就定义好 starter 了,这个 starter 就是一个 maven 项目,在 starter 中执行 maven install 之后,就可以将这个 starter 打包到本地的 maven 仓库了,其他的 SpringBoot 项目可以直接引入 starter 这个 maven 项目,并且使用它的限流功能!
我们在其他的 SpringBoot 项目中使用的话,直接 引入该 starter 的 maven 依赖 ,并且将 自定义注解添加到需要限流的方法 上即可
// 引入依赖
<dependency>
<groupId>com.zqy.ratelimiter</groupId>
<artifactId>ratelimiter-spring-boot-starter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
// 使用限流
@RestController
public class HelloController {
@DoRateLimiter(permitsPerSecond = 1, errorResult = "{\"code\": \"1001\",\"info\": \"调用方法超过最大次数,限流返回!\"}")
@GetMapping("/hello")
public Object hello() {
return "hello";
}
}
Gitee 代码仓库:https://gitee.com/qylaile/rate-limiter-tool-starter
# 4、有看过 SpringBoot 的启动过程吗?
这个问了源码,看过就说看过,没看过就说没看过吧,不过可以了解一下整体的启动流程
SpringBoot 是基于 Spring 的,那么 Spring 源码中核心的方法就是 refresh() 方法,在这个 refresh() 方法中有 12 个方法,通过这 12 个方法完成整个项目中的 Bean 的加载
先说一下 Spring 加载 Bean 的流程:
Spring 整个 Bean 加载的原理: 先根据路径去拿到路径下的所有 class 文件(字节码文件),再去根据注解或者 xml 文件将指定的 Bean 给注册到 Spring 中去,之后进行实例化,实例化之后需要对该 Bean 里的属性进行填充,那么这里填充属性值的时候就涉及到了 循环依赖 的问题,属性填充完毕之后,整个 Bean 就算初始化完成了
Spring 中还支持了许多扩展点,比如 BeanPostProcessor、Aware、@PostConstruct、InitializingBean、DisposableBean ,通过这些扩展点可以让开发者在 Bean 加载的过程中做一些额外的操作
那么整个加载 Bean 的流程主要就是去扫描字节码文件,再通过反射实例化,进行属性填充,完成 Bean 的创建,不过具体到细节流程还是很复杂的,因为还需要初始化很多其他的内容,比如 Bean 工厂、监听器、创建动态代理等等,整个 Bean 加载的流程如下:
那么 SpringBoot 启动的时候也会加载 Bean,因此会调用到 Spring 的 refresh() 方法, 启动流程:
1、SpringBoot 项目启动就是从 SpringApplication.run() 方法开始的
2、根据应用类型加载对应的 Web 容器,SpringBoot 中 内嵌了 Web 容器 的初始化
3、加载一些初始化器,也就是加载一些自动配置类,也就是从 META-INF/spring.factories 配置文件中进行加载(定义 starter 的时候,也是在这个文件中定义了自动配置类,这样 SpringBoot 项目启动的时候就会扫描 spring.factories,之后就可以拿到 starter 中定义的自动配置类,根据自动配置类去加载 stater 中对应的 Bean)
4、之后调用 Spring 的 refresh() 方法来进行 Bean 的实例化、初始化流程
# 5、Spring 的三级缓存介绍下,为啥需要他解决循环依赖?
先说一下 Spring 中的三级缓存是什么,其实就是三个 Map,如下:
// 一级缓存:存储完整的 Bean 对象
private final Map<String, Object> singletonObjects = new ConcurrentHashMap<>(256);
// 二级缓存:存储早期的 Bean 对象,因为有些 Bean 对象可能没有初始化完成,将未初始化完成的 Bean 对象先放在这里
private final Map<String, Object> earlySingletonObjects = new HashMap<>(16);
// 三级缓存:
private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap<>(16);
Spring 中三级缓存的作用是 解决循环依赖
先说一下 循环依赖 问题:比如现在有两个类 A 和 B ,那么如果 A 类中使用到了 B,B 中也使用到了 A,那么这种情况就是循环依赖:
class A {
// 使用了 B
private B b;
}
class B {
// 使用了 A
private A a;
}
循环依赖会出现问题的,当 Spring 去创建 A 的时候,发现 A 依赖了 B,于是去创建 B,发现 B 又依赖了 A,难道还要去创建 A 嘛?
这显然不合理,因此 Spring 通过 三级缓存来解决这个循环依赖 的问题
接下来说一下 Spring 创建 Bean 的流程:
1、先去 一级缓存 singletonObjects 中获取,存在就返回
2、如果不存在或者对象正在创建中,于是去 二级缓存 earlySingletonObjects 中获取
3、如果还没有获取到,就去 三级缓存 singletonFactories 中获取,通过执行 ObjectFacotry 的 getObject() 就可以获取该对象,获取成功之后,从三级缓存移除,并将该对象加入到二级缓存中
在三级缓存中存储的是 ObjectFacoty ,定义为:
public interface ObjectFactory<T> {
T getObject() throws BeansException;
}
Spring 在创建 Bean 的时候,如果允许循环依赖的话,Spring 就会将刚刚实例化完成,但是属性还没有初始化完的 Bean 对象给提前暴露出去,这里通过 addSingletonFactory 方法,向三级缓存中添加一个 ObjectFactory 对象:
// AbstractAutowireCapableBeanFactory # doCreateBean #
public abstract class AbstractAutowireCapableBeanFactory ... {
protected Object doCreateBean(...) {
//...
// 支撑循环依赖:将 ()->getEarlyBeanReference 作为一个 ObjectFactory 对象的 getObject() 方法加入到三级缓存中
addSingletonFactory(beanName, () -> getEarlyBeanReference(beanName, mbd, bean));
}
}
那么上边在说 Spring 创建 Bean 的流程时说了,如果一级缓存、二级缓存都取不到对象时,会去三级缓存中通过 ObjectFactory 的 getObject 方法获取对象
整个解决循环依赖的流程如下:
- 当 Spring 创建 A 之后,发现 A 依赖了 B ,又去创建 B,B 依赖了 A ,又去创建 A
- 在 B 创建 A 的时候,那么此时 A 就发生了循环依赖,由于 A 此时还没有初始化完成,因此在 一二级缓存 中肯定没有 A
- 那么此时就去三级缓存中调用 getObject() 方法去获取 A 的 前期暴露的对象 ,也就是调用上边加入的 getEarlyBeanReference() 方法,生成一个 A 的 前期暴露对象
- 然后就将这个 ObjectFactory 从三级缓存中移除,并且将前期暴露对象放入到二级缓存中,那么 B 就将这个前期暴露对象注入到依赖,来支持循环依赖!
最后总结一下 Spring 如何解决三级缓存
在三级缓存这一块,主要记一下 Spring 是如何支持循环依赖的即可,也就是如果发生循环依赖的话,就去 三级缓存 singletonFactories 中拿到三级缓存中存储的 ObjectFactory 并调用它的 getObject() 方法来获取这个循环依赖对象的前期暴露对象(虽然还没初始化完成,但是可以拿到该对象在堆中的存储地址了),并且将这个前期暴露对象放到二级缓存中,这样在循环依赖时,就不会重复初始化了!
# 6、@Lazy能解决循环依赖吗?
@Lazy 注解可以配置在指定的 Bean 上,也可以配置在 SpringBootApplication 上表示全局配置
@Lazy 注解的作用 就是缩短 Spring IOC 容器的初始化时间,并且在发现循环依赖的时候,也可以通过 @Lazy 来解决
@Lazy 解决循环依赖就是靠 代理 来解决的,使用 @Lazy 标注的对象会被延迟加载
这里举一个例子,比如说有两个 Bean,A 和 B,他们之间发生了循环依赖,那么 A 的构造器上添加 @Lazy 注解之后,加载的流程如下:
- 首先 Spring 会去创建 A 的 Bean,创建时需要注入 B 的属性
- 由于在 A 上标注了 @Lazy 注解,因此 Spring 会去创建一个 B 的代理对象,将这个代理对象注入到 A 中的 B 属性
- 之后开始执行 B 的实例化、初始化,在注入 B 中的 A 属性时,此时 A 已经创建完毕了,就可以将 A 给注入进去
通过 @Lazy 就解决了循环依赖的注入, 关键点 就在于对 A 中的属性 B 进行注入时,注入的是 B 的代理对象,因此不会循环依赖
之前说的发生循环依赖是因为在对 A 中的属性 B 进行注入时,注入的是 B 对象,此时又会去初始化 B 对象,发现 B 又依赖了 A,因此才导致的循环依赖
一般是不建议使用循环依赖的,但是如果项目比较复杂,可以使用 @Lazy 解决一部分循环依赖的问题
# 7、设计模式了解吗?Spring 中用到了哪些?你项目中用到了哪些?
设计模式是 很重要 的,因此一定要好好掌握一下,但是设计模式有很多,我们可以将常用的设计模式给先掌握熟练了
常用的设计模式有: 单例 、 工厂 、 模板 、 代理 、责任链 、 策略 、 适配器 ,这里会总结一下常用设计模式的优点,以及对应的 UML 类图,可以重点掌握一下,并且尝试加入到自己的项目中
Spring 中使用到的设计模式有单例、工厂、代理、模板、观察者、适配器这几种,那么我们介绍的话,可以从最基本的单例、工厂、代理来介绍,模板、观察者、适配器可以根据自己的情况进行掌握,掌握了当然更好(面试官考察应届生一般考察单例比较多, 手写一个单例模式 )
这里具体的设计模式就先不介绍了,之后会把设计模式专栏给更新出来,可以根据设计模式的 UML 图来记忆(面试可能还考察设计模式的 UML 图)
关于面试官问: 项目中使用到了哪些设计模式?
这个就如实介绍一下就好了,如果没有用到的话,可以考虑一下代理模式、模板模式、责任链模式、策略模式这几种,来对项目的代码结构进行优化
这里举一个例子 :比如使用 代理模式 ,在进行 RPC 远程调用的时候,本地肯定是没有远程的实例对象的,因此本地会创建代理对象,并且将 RPC 调用的流程给封装在代理对象中,这样就可以屏蔽掉远程调用的细节,也就是我们直接执行实例对象的方法,其实底层会走到代理对象的拦截器中,执行一系列的远程调用操作
通过 模板模式 可以规定好方法执行的框架,并且将一些方法的实现延迟到子类执行,子类只可以实现这些方法的细节,不可以修改整体的执行流程
通过 责任链模式 可以把对请求处理的多个处理器解耦开,每个处理器负责自己的任务,比如限流处理器只做限流、鉴权处理器只做鉴权,这样如果后续要添加新的功能,只需要添加处理器即可
通过 策略模式 可以灵活添加不同的实现策略,降低代码的耦合性,比如后续需要新增一个策略,那么只需要新增一个策略实现类,并且在策略工厂中稍作修改即可,不过这样违反了设计模式中的开闭原则,可以基于 Spring 来满足开闭原则
通过 适配器模式 可以将使用者和目标类之间进行解耦,通过适配器类来完成中间复杂的适配操作
# 8、项目中布隆过滤器怎么用的?为啥要用,不用行不行?
一般是项目中用到了布隆过滤器,面试官提问的概率会大一些,如果项目中没有使用的话,可能不会问到,不过也可以将布隆过滤器加入到项目中,作为一个小亮点
使用 布隆过滤器 一般就是用于快速判断某个元素是否在集合中出现了,可以用于解决 缓存穿透 的问题,布隆过滤器提供 一组哈希函数 h1, h2, ..., hk ,对需要存储的数据使用哈希函数计算得到 k 个哈希值,将 BitMap 中这 k 个位置都设置为 1,如果这 k 个位置都是1,则 可能 在集合中,但是如果都不是1,则 一定不在 集合中
因此布隆过滤器会出现 误判 ,可能将不在集合中的元素判断为在集合中,可以通过 增加数组长度 来降低误判率
缓存穿透: 请求的数据在数据库中不存在,因此数据也不会在缓存中,每次请求都不会命中缓存,而是打到数据库上,也就是直接穿过缓存打到数据库中,导致数据库压力很大甚至崩溃,这就是缓存穿透
那么缓存穿透的话,可以使用 Redis 的 布隆过滤器 来解决:下载 RedisBloom 插件,该插件是 Redis 的布隆过滤器模块,下载之后在 Redis 的 conf 文件中配置之后即可使用
具体解决缓存穿透的场景 的话,这里举一个例子: 用户注册场景 ,如果系统用户量很大,在用户注册的时候,需要判断用户的用户名是否重复,初始将用户名的信息都初始化在布隆过滤器中,那么在用户注册时,先去布隆过滤器中快速进行判断用户名是否已经被使用,如果经过 k 次哈希计算发现这 k 次哈希值的位置上都为 1,说明 该用户名可能已经被使用了 ,用户注册用户名重复的话,大不了就换一个用户名就好了,这种情况是可以容忍的,之后用户注册成功之后,再将注册成功的用户名也放入的布隆过滤器中,这样在 用户注册时可以通过布隆过滤器快速判断用户名是否重复
上边说了布隆过滤器可能存在 误判 的情况,误判是可以容忍的,但是布隆过滤器解决缓存穿透还存在另外一个缺点:无法删除元素
无法删除元素 会导致如果用户注销帐号了,那么该用户名是无法从布隆过滤器中删除的,因此会导致其他用户也无法注册这个用户名,可以考虑再添加一层 Redis 缓存 来存储已经注销的用户名,同时如果注销的用户名较多的话,可能存在 大 key 问题 ,可以考虑分片存储来解决
这里总结一下如何通过 布隆过滤器解决缓存穿透:
首先将用户名都初始化在布隆过滤器中,用户注册的时候通过 布隆过滤器 快速判断该用户名是否已经被使用了,系统可以容忍一定的误判率,对于布隆过滤器无法删除元素这个缺点,添加一层 Redis 缓存,将已经注销的用户名放在这个 Redis 中的 set 里,这样就可以解决布隆过滤器无法删除元素的缺点了,不过如果注销用户名多了,可能会存在大 key 的问题,因此要考虑 分片存储 解决大 key 问题,也可以从业务角度上,限制每个用户注销的次数
最后再说一下布隆过滤器中容量的计算:
先说一下各个参数的含义:
- m: 布隆过滤器中二进制 bit 数组的长度
- n :需要对多少个元素进行存储,比如说我们要存储 1000 万个用户名,那么 n = 1000 万
- p: 期望的误判率,可以设置 p = 0.001(%0.1) 或者 p = 0.0001(%0.01)
将 n、p 带入上述公式即可计算出来理想情况下布隆过滤器的二进制数组的长度,也可以根据此公式算出来存储这么多元素大概需要占用多少内存空间,比如需要存储 10 亿个用户名,期望误判率为 0.001,也就是将 n = 10亿、p = 0.001 带入,得到 m 约为 1.67GB ,因此这个布隆过滤器大约占用 1.67GB 的空间(可以搜索在线布隆过滤器容量计算)
# 9、Redis用过吗,和本地缓存有啥区别?
Redis 是 分布式缓存 ,本地缓存是 单机缓存 ,那么在分布式系统中,如果将数据放在本地缓存中,其他节点肯定是无法进行访问了
其次就是 本地缓存相对于 Redis 缓存来说会更快 ,因为去 Redis 中查询数据虽然 Redis 基于内存操作比较快,但是应用还需要和 Redis 发起网络 IO ,而使用本地缓存就不需要网络 IO 了,因此本地缓存更快
# 10、Redis 的数据结构
Redis 有 5 种 基本数据结构 :String、List、Hash、Set(无序集合、值唯一)、Zset(有序集合)
我们都知道 Redis 速度是比较快的,因为他是基于 内存 操作,并且利用 IO 多路复用 来提升处理客户端连接上请求的速度,使用 单线程 处理客户端请求,速度很快
其次呢,Redis 对基本数据结构也做了许多优化 :
- String 底层的 SDS 数据结构优化
字符串底层封装了 SDS 数据结构来实现,没有使用 C 语言默认的字符数组来实现,SDS 内部直接 存储了字符串的长度 len ,因此获取长度的时间复杂度为 O(1),而 C 语言字符串获取长度需要遍历字符串,时间复杂度为 O(N),并且 C 语言以 \0
表示字符串结尾,因此无法存储二进制数据,SDS 中记录字符串长度就不需要结尾标识符了,因此 可以存储二进制数据
SDS 中还采用了 空间预分配策略 ,在 SDS 进行空间扩展时,会同时分配 所需空间 和 额外的未使用空间 ,以减少内存再分配次数
- listPack 优化
Redis 7.0 之后,zipList 被废弃,转而使用 listPack 来代替,因为 zipList 中存在 级联更新 的问题:
假如 zipList 中每一个 entry 都是很接近但又不到 254B,此时每个 entry 的 prevlength 使用 1 个字节就可以保存上个节点的长度,但是此时如果向 zipList 中间插入一个新的节点,长度大于 254B,那么新插入节点后边的节点需要把 prevlength 扩展为 5B 来存储新插入节点的长度,那么扩展后该节点长度又大于 254B,因此后边节点需要再次扩展 prevlength 来存储该节点的长度,导致了插入节点后边的所有节点都需要更新 prevlength 的值,这属于是极端情况下才会发生
- 跳表优化
并且使用 跳表 来优化 Zset 的性能,Zset 底层是通过 压缩列表 + 跳表 来实现的,跳表可以支持平均时间复杂度为 O(logN) 的查询操作
# 11、Redis的持久化机制
Redis 持久化机制有 AOF、RDB 和 混合持久化三种方式,这就是常规八股文了
要知道这三种持久化方式的区别:
使用 RDB 会进行全量备份,RDB 持久化文件时压缩后的二进制文件,因此加载 RDB 文件的速度是远超 AOF 的,不过缺点就是持久化不及时,可能丢失数据
RDB 持久化有两种方式 save 和 bgsave,save 会导致 Redis 阻塞,bgsave 会利用子进程对数据进行持久化,利用到了 写时复制技术 ,写时复制的优点就是利用单独子进程持久化,不会阻塞
AOF 会进行实时备份,AOF 文件中存储的是 RESP 协议数据格式,会存储 Redis 执行的每一条命令,因此实时性比较强,缺点就是速度比较慢(因为需要将命令刷入磁盘中),AOF 肯定不会对每一条命令都写入到磁盘中,而是会先写入到内存中,再统一刷入到磁盘中,刷盘策略分为三种:always(每条命令都立即刷入磁盘)、everysec( 默认 ,每秒同步一次)、no(Redis 不调用文件同步,而是交给操作系统自己判断同步时机)
AOF 文件过大的话,会进行 重写(Rewrite) 压缩体积,使用重写之后,就会将最新数据记录到 AOF 文件中,比如之前对于 name 属性设置了好多次,AOF 文件中记录了 set name n1,set name n2 ...,那么在 重写 之后,AOF 文件中就直接记录了 set name nn,nn 就是 name 的最新值,通过这样来减小 AOF 文件体积是
混合持久化 (Redis4.0 提出)的话,结合了 RDB 和 AOF,既保证了 Redis 性能,又降低了数据丢失的风险,缺点就是 AOF 文件中包含了 RDB 格式的数据,可读性较差
# 12、Redis的内存如果满了,会发生什么?
Redis 的内存如果达到阈值,就会触发 内存淘汰机制 来选择一些数据进行淘汰,如果淘汰之后还没有内存的话,就会返回写操作 error 提示(Redis 内存阈值通过 redis.conf 的 maxmemory 参数来设置)
Redis 中提供了 8 种内存淘汰策略:
noeviction (默认策略):不删除键,返回错误 OOM ,只能读取不能写入
volatile-lru :针对设置了过期时间的 key,使用 LRU 算法进行淘汰
allkeys-lru :针对所有 key 使用 LRU 算法进行淘汰
volatile-lfu :针对设置了过期时间的 key,使用 LFU 算法进行淘汰
allkeys-lfu :针对所有 key 使用LFU 算法进行淘汰
volatile-random :从设置了过期时间的 key 中随机删除
allkeys-random : 从所有 key 中随机删除
volatile-ttl :删除生存时间最近的一个键
# 13、Redis如何实现事务?
Redis 自身提供了 事务功能 ,但是并没有 回滚机制 ,Redis 的事务可以顺序执行队列中的命令,保证其他客户端提交的命令不会插入到事务执行的命令序列中
Redis 提供了 MULTI、EXEC、DISCARD 和 WATCH 这些命令来支持事务
Redis 事务的缺点:
- 不保证原子性: 事务执行过程中,如果所有命令入队时未报错,但是在事务提交之后,在执行的时候报错了,此时正确的命令仍然可以正常执行,因此 Redis 事务在该情况下不保证原子性
- 事务中的每个命令都需要与 Redis 进行网络通信
因此 Redis 自身的事务使用的比较少,而是更多的使用 Lua 脚本 来保证命令执行原子性,使用 Lua 脚本的 好处 :
- 减少网络开销: 多个请求以脚本的形式通过一次网络 IO 即可发送到 Redis
- 原子操作: Redis 会原子性执行整个 Lua 脚本
- 复用: 客户端的 Lua 脚本会永久存在Redis,之后可以复用
这里 Redis Lua 脚本的原子性指的是保证在执行 Lua 脚本的时候,不会被其他操作打断,从而保证了原子性,但是在 Lua 脚本中如果发生了异常,异常前的命令还是会被正常执行,并且无法进行回滚, 因此要注意 Lua 中保证的原子性是指在 Lua 脚本执行过程中不会被其他操作打断
Lua 脚本在执行过程中不会被打断,因此注意不要在 Lua 脚本中执行比较耗时的操作,导致 Redis 阻塞时间过长!
接下来还问了有 Redis 如何实现滑动窗口限流、常见的限流算法、漏桶和令牌桶限流有什么区别、如何进行选择,这一块限流的内容放在下一篇文章说明!
# 14、如何用 Redis 实现滑动窗口限流?
这里基于 Redis 实现一个滑动窗口限流算法
- 什么是滑动窗口限流?
滑动窗口限流指在一定时间窗口内限制请求的数量,并且随着时间的推移,新增的请求会被记入新的时间段内,通过滑动窗口限流可以使得限流更加平滑
- 如何基于 Redis 实现滑动窗口限流?
可以基于 Redis 的 Zset 数据结构实现,每当有一个请求进来时,将当前请求加入到 Zset 中,并且通过 zremrangebyscore
将当前窗口之前的请求给移除掉
移除之后,此时 Zset 中的请求就是当前时间窗口中的请求了,此时通过 zcard
命令查询 Zset 集合中的请求数量,如果大于请求阈值,就拒绝之后的请求;否则,就可以接受之后的请求,使用的命令如下:
# 将当前请求加入到 slideWindows 滑动窗口限流中,将当前时间作为 score 加入
zadd slideWindows [currentTime] [currentTime]
# 移除当前窗口之前的请求,currentTime - windowsDuartion * 1000 为当前时间窗口开始节点,windowsDuration 单位这里是 s,因此乘 1000 转为毫秒
zremrangebyscore slideWindows 0 currentTime - windowsDuartion * 1000
# 统计当前窗口请求数量
zcard slideWindows
Java 中判断逻辑如下(伪代码):
// 最后判断窗口内的请求数量是否大于阈值
if (redisCache.zcard("slideWindows") >= WINDOWS_THRESHOLD) {
// 达到阈值,返回 true,表示触发限流
return true;
}
// 未达到阈值,返回 false,表示不触发限流
return false;
# 15、还了解哪些限流算法?
还有两种常用的限流算法: 漏桶算法 和 令牌桶算法
Google Guava 包下的 RateLimiter 是基于令牌桶算法的思想实现,这里说一下这两种限流算法以及它们的区别
# 漏桶算法
漏桶算法的原理就是 将请求加入漏桶中,漏桶以固定速率出水,如果请求在漏桶中溢出就拒绝请求
那么这个漏桶就可以使用一定长度的队列来实现,长度就是这个漏桶所能容纳请求的数量,再通过另一个线程从队列的另一端去不断取出任务执行就可以了
- 漏桶算法存在的问题
漏桶算法存在的问题就是只能以固定速率处理到来的请求,无法处理突发请求 ,也就是一瞬间如果有超过漏桶大小的请求数量过来的话,超出的那部分请求就会被无情的抛弃
那么漏桶算法的这个问题在令牌桶算法中得到了解决 ,如果请求一开始数量较少,令牌桶中会积累令牌数量,当有突发流量到来的时候,会去使用已经积累的令牌数量来去处理这些请求,并且 RateLimiter 的实现中 还可以对未来令牌数量透支 ,这样 RateLimiter 实现的令牌桶算法就可以很好的应对突发流量了,不过这样带来的缺点就是如果一直并发量比较高,导致对未来的令牌数量一直透支,会导致后边请求的阻塞等待时间逐渐变长,解决方法就是适当的加一些请求拒绝策略就可以缓解这种现象
在高并发的场景中,突发流量还是比较常见的,因此在 RateLimiter 基于令牌桶算法实现中为了应对突发流量,做出了透支令牌的优化
漏桶算法如下图所示:
漏桶算法和令牌桶算法还有一点区别就是:
漏桶算法是需要将请求给存储在队列中,而在令牌桶算法中,并没有真正去产生令牌,而是根据时间差来计算这段时间应该产生的令牌数, 所以令牌桶算法的性能相对于漏桶算法来说是比较高的!
# 令牌桶算法
令牌桶算法的原理就是 系统使用恒定速率往桶中放入令牌,如果请求需要被处理,就从桶中获取令牌,如果没有令牌的话,请求被拒绝
RateLimiter 就是基于令牌桶算法实现的,在他里边并没有真正的去创建令牌实体,而是根据时间差来计算这一段时间产生的令牌数,这样做的好处就是 性能比较高
如果真正要去创建令牌实体的话,肯定需要再启动一个任务,以固定速率向令牌桶中生成令牌,那么启动一个新的任务是会带来一定的系统开销的,可能会加重系统的负担,那么通过时间差来计算令牌数的话,通过简单的计算就可以拿到产生的令牌数量,开销大大减少
# 两种算法的区别
漏桶算法 :主要用于平滑流量,对于突发流量的应对不好,如果突发流量过大超出了队列长度就会被无情抛弃;并且需要将请求存储在队列中
令牌桶算法 :为了更好应对突发流量引入了透支令牌的优化,但是如果一直透支对后来的请求也很不友好,有利有弊;并且 RateLimiter 中对令牌桶算法还做出了优化,并不真正去生成令牌实体,而是根据时间去计算应该生成的令牌数,降低系统开销
# 两种算法改如何选择呢?
在系统中,一般来说突发流量会比较多一些,因此限流算法如果应对突发流量更好一些的话,用户体验会更好,因此可以选择 令牌桶算法 ,基于令牌的方式,可以很好应对突发流量,并且还可以对未来令牌进行透支,不过透支未来令牌会导致后边的请求阻塞时间过长,因此可以考虑加入一定的拒绝策略,不要透支太多的令牌(详细见下方 RateLimiter 缺陷)
并且令牌桶算法基于时间来计算令牌,不需要生成令牌实体,而漏桶算法还要生成请求存储在队列中,因此令牌桶算法性能相对来说更好
# 扩展:RateLimiter 的缺陷
RateLimiter 是存在缺陷的,如果系统的并发量逐步升高,通过 acquire()
方法是一定会去获取令牌的,而由于 RateLimiter 中 透支未来令牌 的设计,这就会导致后边的请求等待时间会逐步升高,下边代码模拟了并发量逐步升高的场景,从输出结果看可以发现后边的请求等待的时间越来越长,这显然对后来的请求很不友好
public static void main(String[] args) throws InterruptedException {
RateLimiter rateLimiter = RateLimiter.create(2);
for (int i = 1; i < 20; i ++) {
double acquire = rateLimiter.acquire(i);
System.out.println("获取了" + i + "个令牌,等待时间为" + acquire);
}
/**
* 输出:
* 获取了1个令牌,等待时间为0.0
* 获取了2个令牌,等待时间为0.499337
* 获取了3个令牌,等待时间为0.998667
* 获取了4个令牌,等待时间为1.499843
* 获取了5个令牌,等待时间为1.996169
* 获取了6个令牌,等待时间为2.499906
* 获取了7个令牌,等待时间为2.993976
* 获取了8个令牌,等待时间为3.499379
* 获取了9个令牌,等待时间为3.999501
* 获取了10个令牌,等待时间为4.490265
*/
}
- 怎么来解决这个问题呢?
这个问题的原因就是 acquire() 方法一定会获取令牌,那么我们在获取令牌之前可以先使用 tryAcquired 检测:
1、如果可行再去 acquire()
2、如果令牌不足,适当拒绝请求
因此解决策略就是我们去 定义一个拒绝策略 ,当发现等待的时间远远超出了可以接受的范围,就将该请求给拒绝掉,这样就不会导致一致透支未来的令牌,导致后边的请求越来越慢
- acquire 包装代码解析
如下代码(来源于 xjjdog 作者的 Github),我们将 acquire 方法给包装一下,先通过 tryAcquire() 尝试获取令牌,如果获取不到返回 false,我们再将请求数量给记录到原子类中,再通过 acquire() 开始阻塞等待获取令牌,当发现等待的请求数量超过指定的最大请求数量之后,就将之后的请求给拒绝掉!
public class FollowController {
private final RateLimiter rateLimiter;
private int maxPermits;
private Object mutex = new Object();
//等待获取permits的请求个数,原则上可以通过maxPermits推算
private int maxWaitingRequests;
private AtomicInteger waitingRequests = new AtomicInteger(0);
public FollowController(int maxPermits,int maxWaitingRequests) {
this.maxPermits = maxPermits;
this.maxWaitingRequests = maxWaitingRequests;
rateLimiter = RateLimiter.create(maxPermits);
}
public FollowController(int permits,long warmUpPeriodAsSecond,int maxWaitingRequests) {
this.maxPermits = maxPermits;
this.maxWaitingRequests = maxWaitingRequests;
rateLimiter = RateLimiter.create(permits,warmUpPeriodAsSecond, TimeUnit.SECONDS);
}
public boolean acquire() {
return acquire(1);
}
public boolean acquire(int permits) {
boolean success = rateLimiter.tryAcquire(permits);
if (success) {
rateLimiter.acquire(permits);//可能有出入
return true;
}
if (waitingRequests.get() > maxWaitingRequests) {
return false;
}
waitingRequests.getAndAdd(permits);
rateLimiter.acquire(permits);
waitingRequests.getAndAdd(0 - permits);
return true;
}
}