分布式限流单位时间多实例多线程访问次数限制 接前面聊一聊redisson及优雅实现和说一说springboot优雅集成redisson,简单以源码的方式给大家介绍了redisson的:可重入性、阻塞、续约、红锁、联锁、加锁解锁流程和集成springboot注意点和优雅实现方式。 接下来在讲一讲平时用的比较多的限流模块RRateLimiter1。简单使用publicstaticvoidmain(String〔〕args)throwsInterruptedException{RRateLimiterrateLimitercreateLimiter();intallThreadNum20;CountDownLatchlatchnewCountDownLatch(allThreadNum);longstartTimeSystem。currentTimeMillis();for(inti0;iallThreadNum;i){newThread((){if(i30)Thread。sleep(1000);booleanpassrateLimiter。tryAcquire();if(pass){log。info(get);}else{log。info(no);}latch。countDown();})。start();}latch。await();System。out。println(Elapsed(System。currentTimeMillis()startTime));}publicstaticRRateLimitercreateLimiter(){ConfigconfignewConfig();config。useSingleServer()。setTimeout(1000000)。setPassword(123456)。setAddress(redis:xxxx:6379);RedissonClientredissonRedisson。create(config);RRateLimiterrateLimiterredisson。getRateLimiter(myRateLimiter3);初始化:PERCLIENT单实例执行,OVERALL全实例执行最大流速每10秒钟产生3个令牌rateLimiter。trySetRate(RateType。OVERALL,3,10,RateIntervalUnit。SECONDS);returnrateLimiter;}复制代码 实际结果:〔2022102914:32:46。261〕〔INFO〕〔main〕〔〕〔〕RedisTestget〔2022102914:32:46。312〕〔INFO〕〔main〕〔〕〔〕RedisTestget〔2022102914:32:46。358〕〔INFO〕〔main〕〔〕〔〕RedisTestget〔2022102914:32:47。416〕〔INFO〕〔main〕〔〕〔〕RedisTestno〔2022102914:32:47。469〕〔INFO〕〔main〕〔〕〔〕RedisTestno〔2022102914:32:47。517〕〔INFO〕〔main〕〔〕〔〕RedisTestno〔2022102914:32:48。577〕〔INFO〕〔main〕〔〕〔〕RedisTestno〔2022102914:32:48。623〕〔INFO〕〔main〕〔〕〔〕RedisTestno复制代码2。实现限流redisson使用了哪些redis数据结构Hash结构限流器结构:参数rate代表速率参数interval代表多少时间内产生的令牌参数type代表单机还是集群ZSET结构记录获取令牌的时间戳,用于时间对比。16670251663122022102914:32:4616670251662622022102914:32:4616670251662152022102914:32:46 3。String结构记录的是当前令牌桶中的令牌数【很明显被我用完了现在是0】 3。超过10s,我再次获取一个令牌,数据结构发生的变化ZSET结构。新生成一个ZSET结构,存放获取令牌的时间戳 String结构当前令牌桶还有2个令牌 4。源码浅析RRateLimiterrateLimiterredisson。getRateLimiter(myRateLimiter3);初始化最大流速每10秒钟产生3个令牌rateLimiter。trySetRate(RateType。PERCLIENT,3,10,RateIntervalUnit。SECONDS);复制代码 初始化定义没有什么好讲的,就是创建HASH结构 主要还是讲讲:rateLimiter。tryAcquire()privateTRFutureTtryAcquireAsync(RedisCommandTcommand,Longvalue){returnthis。commandExecutor。evalWriteAsync(this。getName(),LongCodec。INSTANCE,command,localrateredis。call(hget,KEYS〔1〕,rate);localintervalredis。call(hget,KEYS〔1〕,interval);localtyperedis。call(hget,KEYS〔1〕,type);assert(ratefalseandintervalfalseandtypefalse,RateLimiterisnotinitialized)localvalueNameKEYS〔2〕;localpermitsNameKEYS〔4〕;iftype1thenvalueNameKEYS〔3〕;permitsNameKEYS〔5〕;end;localcurrentValueredis。call(get,valueName);ifcurrentValuefalsethenlocalexpiredValuesredis。call(zrangebyscore,permitsName,0,tonumber(ARGV〔2〕)interval);localreleased0;fori,vinipairs(expiredValues)dolocalrandom,permitsstruct。unpack(fI,v);releasedreleasedpermits;end;ifreleased0thenredis。call(zrem,permitsName,unpack(expiredValues));currentValuetonumber(currentValue)released;redis。call(set,valueName,currentValue);end;iftonumber(currentValue)tonumber(ARGV〔1〕)thenlocalnearestredis。call(zrangebyscore,permitsName,(。。(tonumber(ARGV〔2〕)interval),tonumber(ARGV〔2〕),withscores,limit,0,1);localrandom,permitsstruct。unpack(fI,nearest〔1〕);returntonumber(nearest〔2〕)(tonumber(ARGV〔2〕)interval);elseredis。call(zadd,permitsName,ARGV〔2〕,struct。pack(fI,ARGV〔3〕,ARGV〔1〕));redis。call(decrby,valueName,ARGV〔1〕);returnnil;end;elseassert(tonumber(rate)tonumber(ARGV〔1〕),Requestedpermitsamountcouldnotexceeddefinedrate);redis。call(set,valueName,rate);redis。call(zadd,permitsName,ARGV〔2〕,struct。pack(fI,ARGV〔3〕,ARGV〔1〕));redis。call(decrby,valueName,ARGV〔1〕);returnnil;end;,Arrays。asList(this。getName(),this。getValueName(),this。getClientValueName(),this。getPermitsName(),this。getClientPermitsName()),newObject〔〕{value,System。currentTimeMillis(),ThreadLocalRandom。current()。nextLong()});}复制代码 主要就是这段lua代码,下面我详细过一下 作者目前用的3。16。3版本,刚好遇见redisson的bug,见3197,请大家用最新版本,以下为修复后解析。获取hash结构的速率localrateredis。call(hget,KEYS〔1〕,rate)获取hash结构的时间区间(ms)localintervalredis。call(hget,KEYS〔1〕,interval)获取hash结构的时间类型localtyperedis。call(hget,KEYS〔1〕,type)判断是否初始化限流结构assert(ratefalseandintervalfalseandtypefalse,RateLimiterisnotinitialized){name}:valuestring结构,这个key记录的是当前令牌桶中的令牌数localvalueNameKEYS〔2〕{name}:permitszset结构,记录了请求的令牌数,score则为请求的时间戳localpermitsNameKEYS〔4〕单机限流才会用到,集群模式不用关注iftype1thenvalueNameKEYS〔3〕permitsNameKEYS〔5〕end生产速率rate必须比请求的令牌数大assert(tonumber(rate)tonumber(ARGV〔1〕),Requestedpermitsamountcouldnotexceeddefinedrate)初始化RateLimiter并不会初始化stirng结构,因此第一次获取这里currentValue是nulllocalcurrentValueredis。call(get,valueName)ifcurrentValuefalsethen第二次获取令牌执行获取zset结构:统计之前的请求令牌数范围是0(第二次请求时间戳令牌生产的时间)localexpiredValuesredis。call(zrangebyscore,permitsName,0,tonumber(ARGV〔2〕)interval)localreleased0lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌fori,vinipairs(expiredValues)do获取请求数permitslocalrandom,permitsstruct。unpack(fI,v)releasedreleasedpermitsend之前的请求令牌数0,例如10s产生3个令牌,现在超过10s了,重置周期并计算剩余令牌数ifreleased0then移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】redis。call(zrem,permitsName,unpack(expiredValues))currentValuetonumber(currentValue)released更新string结构:剩下令牌数释放令牌数redis。call(set,valueName,currentValue)end如果当前令牌数请求的令牌数iftonumber(currentValue)tonumber(ARGV〔1〕)then从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息localnearestredis。call(zrangebyscore,permitsName,(。。(tonumber(ARGV〔2〕)interval),tonumber(ARGV〔2〕),withscores,limit,0,1);localrandom,permitsstruct。unpack(fI,nearest〔1〕)返回上一次请求的时间戳(当前时间戳令牌生成的时间间隔)这个值表示还需要多久才能生产出足够的令牌returntonumber(nearest〔2〕)(tonumber(ARGV〔2〕)interval)else如果当前令牌数请求的令牌数,表示令牌够多,更新zset更新zset结构redis。call(zadd,permitsName,ARGV〔2〕,struct。pack(fI,ARGV〔3〕,ARGV〔1〕))更新Stringt结构,减少一个剩下的令牌数redis。call(decrby,valueName,ARGV〔1〕)returnnilendelse汀雨笔记初始化Stringt结构,当前限流器的令牌数redis。call(set,valueName,rate)汀雨笔记初始化zset结构redis。call(zadd,permitsName,ARGV〔2〕,struct。pack(fI,ARGV〔3〕,ARGV〔1〕))struct。pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体,ARGV〔2〕就是请求时间戳,ARGV〔1〕是请求的令牌数,统计会用到,ARGV〔3〕是当前时间戳为种子的随机数,具体用处还不知道,知道的网友可以留言更新Stringt结构,因为这是获取令牌操作,减掉一个令牌【本文作者认为,这里可以直接初始化string结构,值为rate1】redis。call(decrby,valueName,ARGV〔1〕)returnnilend复制代码 这段lua代码也并不复杂,令牌桶的数量主要是通过时间窗口来控制,判断上一个请求是否超过了令牌生产周期。 留下一个疑问?移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】redis。call(zrem,permitsName,unpack(expiredValues))复制代码 我自己在本地测试,只要超过10s,permitsName就不一样,这就导致了这部分数据是不能移除的,就产生了冗余数据,从前面的截图也可以看出,是新生成了一个zset数据结构。 相当于直接走到了这一步:更新zset结构redis。call(zadd,permitsName,ARGV〔2〕,struct。pack(fI,ARGV〔3〕,ARGV〔1〕))复制代码 至于为什么会产生这样的结果,会的小伙伴可以留言,或者过段时间我提个issue。 及时当勉励岁月不待人 能看到这里的人呀,都是菁英。 非常感谢