一,缓存穿透
缓存穿透是指当缓存系统中无法命中需要的数据时,会直接请求底层存储系统(如数据库),但是如果请求的数据根本不存在,那么大量的请求就会直接穿透缓存层,直接访问底层存储系统,导致底层系统压力过大,甚至崩溃。这也是缓存系统面临的一种常见攻击。
说白了就是查询大量不传在的key 绕过缓存 直接查询数据库 导致缓存跟不不存在一样 这就是缓存穿透。
二,如何解决
那么如果有一个用户就是无限基于这个不存在的key来请求查询,那该怎么办呢?
这种情况下,你可以将在数据库查询不到的数据放在缓存中,并存入 NULL 值,这样就能减少数据库的性能消耗。
但是如果此时有恶意攻击者已经发现你系统的这个漏洞,频繁地用不存在的数据的Key来进行请求,这样你又该怎么去防范呢?
如果仍使用存入 NULL 值的方法,就会导致缓存中有大量的Key - NULL这样的无用键值对。
如果我们能通过一个过滤器,来判断这个值是否存在,再去查询缓存或者数据库,这样,就能过滤掉很多没有意义的请求。
三,Bloom Filter原理
当一个元素加入布隆过滤器中的时候,会进行如下操作:
使用布隆过滤器中的哈希函数对元素值进行计算,得到哈希值(有几个哈希函数得到几个哈希值)。
根据得到的哈希值,在位数组中把对应下标的值置为 1。
当我们需要判断一个元素是否存在于布隆过滤器的时候,会进行如下操作:
对给定元素再次进行相同的哈希计算;
得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,
如果存在一个值不为 1,说明该元素不在布隆过滤器中。
有8个hash位,且默认值为0,
当对tencent进行散列hash之后,2,5,7位被置为1,
继续对cloud进行散列hash后,3,4,6位被置为1。
这时候我们再对其它词汇other进行hash,假设散列为1、2、3,1位为0,可以判断该单词不存在集合中。
四,实现
public class BloomFilter {
private int n;
private double p;
private int m;
private int k;
private BitSet bitMap;
public BloomFilter(int n, double p){
this.n = n;
this.p = p;
}
public void addElement(String element){
if (bitMap == null){
init();
}
int[] posArr = getIndexes(element);
for (int tempos: posArr) {
bitMap.set(tempos, true);
}
}
private synchronized void init(){
if (this.m == 0) {
this.m = (int) ((-n * Math.log(this.p)) / (Math.log(2) * Math.log(2)));
}
if (this.k == 0) {
this.k = Math.max(1, (int) Math.round(this.m / this.n * Math.log(2)));
}
if (bitMap == null) {
bitMap = new BitSet(this.m);
}
System.out.println("this.m : " + this.m);
System.out.println("this.k : " + this.k);
}
private int[] getIndexes(String element){
int[] retArr = new int[this.k];
for (int i = 0; i < this.k; i++) {
retArr[i] = MD5Hash(element + i) % this.m;
}
return retArr;
}
private int MD5Hash(String key) {
MessageDigest md5 = null;
try {
md5 = MessageDigest.getInstance("md5");
byte[] bytes = key.getBytes();
md5.update(bytes);
BigInteger bigInteger = new BigInteger(md5.digest());
return Math.abs(bigInteger.intValue());
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return -1;
}
public boolean isExist(String element) {
int[] posArr = getIndexes(element);
boolean flag = true;
for (int temPos : posArr) {
flag = flag && bitMap.get(temPos);
}
return flag;
}
public static void main(String[] args) {
BloomFilter filter = new BloomFilter(1000000, 0.0003);
for (int i = 0; i < 1000000; i++) {
filter.addElement("abc"+i);
}
int count = 0;
for (int i = 0; i < 2000000; i++) {
if (filter.isExist("abc"+i)){
count++;
}
}
System.out.println("count: "+count);
System.out.println(1.0*(count-1000000)/1000000);
}
}
五,springboot项目改造
1.交给spring管理
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Bean
public BloomFilter bloomFilter(){
return new BloomFilter(2000000,0.003);
}
}
2.在保存数据的时候加入Bloom Filter
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
boolean phoneInvalid = RegexUtils.isPhoneInvalid(loginForm.getPhone());
if (phoneInvalid) return Result.fail(ErrorMessageConstants.PHONE_ERROR_RESP);
String code = stringRedisTemplate.opsForValue().get(USER_LOGIN_CODE + loginForm.getPhone());
if (code == null) {
log.info("验证码{}已经失效",code);
return Result.fail(ErrorMessageConstants.CHECK_CODE_EXPIRE);
}
User user = query().eq("phone", loginForm.getPhone()).one();
if (user == null) {
user = createUser(loginForm);
bloomFilter.addElement(USER_CHECK_KEY + user.getId());
log.info("新用户创建成功{}",user.getId());
}
String token = UUID.randomUUID().toString();
String userKey = USER_TOKEN_HEAD + token;
log.info("添加的userKey为{}",userKey);
log.info("颁发的token为{}",token);
Map<String, Object> userMap = getStringObjectMap(user);
stringRedisTemplate.opsForHash().putAll(userKey,userMap);
stringRedisTemplate.expire(userKey,LOGIN_USER_TTL, TimeUnit.MINUTES);
return Result.ok(token);
}
3.查询增加判断
@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id") Long userId){
boolean exist = bloomFilter.isExist(USER_CHECK_KEY + userId);
if (!exist) return Result.fail(ErrorMessageConstants.USER_NOT_EXISTS);
User user = userService.getById(userId);
if (user == null) {
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
return Result.ok(userDTO);
}
六,如何选择哈希函数个数和布隆过滤器长度
布隆过滤器的长度会直接影响其误报率,一般来说,布隆过滤器越长,其误报率越小。这是因为较长的布隆过滤器提供了更多的位供哈希函数映射,从而减少了哈希冲突的可能性,进而降低了误报率。然而,较长的布隆过滤器也会增加存储空间的消耗,因此在选择长度时需要考虑到实际应用对误报率和存储空间的权衡。
哈希函数的个数同样重要,因为它们决定了布隆过滤器的性能和误报率。哈希函数的数量越多,布隆过滤器中bit位置位的速度越快,这有助于提高过滤效率,但同时也会增加误报率。这是因为多个哈希函数可能会将不同的键映射到同一个位上,从而导致误判。因此,选择合适的哈希函数个数需要在过滤效率和误报率之间找到平衡。
增加哈希函数k的数量可以大大降低错误率p。这意味着通过增加哈希函数的数量,可以减少布隆过滤器误报的可能性,尽管这可能会以降低过滤效率为代价。因此,在设计和调整布隆过滤器时,需要根据具体的应用场景和需求来决定最佳的哈希函数个数和布隆过滤器长度。
总的来说,选择哈希函数个数和布隆过滤器长度时,应考虑到误报率、存储空间的消耗、过滤效率等多个因素,并根据实际应用的需求进行权衡和调整。
七,应用场景
- 大数据判断是否存在:这就可以实现出上述的去重功能,如果你的服务器内存足够大的话,那么使用 HashMap 可
能是一个不错的解决方案,理论上时间复杂度可以达到 O(1)的级别,但是当数据量起来之后,还是只能考虑布隆过滤器。 - 解决缓存穿透(背景中提到的问题):利用布隆过滤器我们可以预先把数据查询的主键,比如用户 ID 或文章 ID
缓存到过滤器中。当根据 ID 进行数据查询的时候,我们先判断该 ID 是否存在,若存在的话,则进行下一步处 理。若不存在的话,直接返回,这样就不会触发后续的数据库查询。需要注意的是缓存穿透不能完全解决,我们只 能将其控制在一个可以容忍的范围内。 - 爬虫/ 邮箱等系统的过滤:平时不知道你有没有注意到有一些正常的邮件也会被放进垃圾邮件目录中,这就是使用布隆过滤器 误判 导致的。
- Google Chrome 使用布隆过滤器识别恶意 URL。