本文主要是介绍聊聊redisson的RRateLimiter,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下redisson的RRateLimiter
RRateLimiter
redisson/src/main/java/org/redisson/api/RRateLimiter.java
public interface RRateLimiter extends RRateLimiterAsync, RExpirable {/*** Initializes RateLimiter's state and stores config to Redis server.* * @param mode - rate mode* @param rate - rate* @param rateInterval - rate time interval* @param rateIntervalUnit - rate time interval unit* @return {@code true} if rate was set and {@code false}* otherwise*/boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);/*** Updates RateLimiter's state and stores config to Redis server.** @param mode - rate mode* @param rate - rate* @param rateInterval - rate time interval* @param rateIntervalUnit - rate time interval unit*/void setRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);/*** Acquires a permit only if one is available at the* time of invocation.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then this method will return* immediately with the value {@code false}.** @return {@code true} if a permit was acquired and {@code false}* otherwise*/boolean tryAcquire();/*** Acquires the given number of <code>permits</code> only if all are available at the* time of invocation.** <p>Acquires a permits, if all are available and returns immediately,* with the value {@code true},* reducing the number of available permits by given number of permits.** <p>If no permits are available then this method will return* immediately with the value {@code false}.** @param permits the number of permits to acquire* @return {@code true} if a permit was acquired and {@code false}* otherwise*/boolean tryAcquire(long permits);/*** Acquires a permit from this RateLimiter, blocking until one is available.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.* */void acquire();/*** Acquires a specified <code>permits</code> from this RateLimiter, * blocking until one is available.** <p>Acquires the given number of permits, if they are available * and returns immediately, reducing the number of available permits * by the given amount.* * @param permits the number of permits to acquire*/void acquire(long permits);/*** Acquires a permit from this RateLimiter, if one becomes available* within the given waiting time.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* specified waiting time elapses.** <p>If a permit is acquired then the value {@code true} is returned.** <p>If the specified waiting time elapses then the value {@code false}* is returned. If the time is less than or equal to zero, the method* will not wait at all.** @param timeout the maximum time to wait for a permit* @param unit the time unit of the {@code timeout} argument* @return {@code true} if a permit was acquired and {@code false}* if the waiting time elapsed before a permit was acquired*/boolean tryAcquire(long timeout, TimeUnit unit);/*** Acquires the given number of <code>permits</code> only if all are available* within the given waiting time.** <p>Acquires the given number of permits, if all are available and returns immediately,* with the value {@code true}, reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* the specified waiting time elapses.** <p>If a permits is acquired then the value {@code true} is returned.** <p>If the specified waiting time elapses then the value {@code false}* is returned. If the time is less than or equal to zero, the method* will not wait at all.** @param permits amount* @param timeout the maximum time to wait for a permit* @param unit the time unit of the {@code timeout} argument* @return {@code true} if a permit was acquired and {@code false}* if the waiting time elapsed before a permit was acquired*/boolean tryAcquire(long permits, long timeout, TimeUnit unit);/*** Returns current configuration of this RateLimiter object.* * @return config object*/RateLimiterConfig getConfig();/*** Returns amount of available permits.** @return number of permits*/long availablePermits();}
RRateLimiter继承了RRateLimiterAsync、RExpirable接口,它主要定义了trySetRate、setRate、tryAcquire、acquire、getConfig、availablePermits方法
RRateLimiterAsync
redisson/src/main/java/org/redisson/api/RRateLimiterAsync.java
public interface RRateLimiterAsync extends RExpirableAsync {/*** Initializes RateLimiter's state and stores config to Redis server.* * @param mode - rate mode* @param rate - rate* @param rateInterval - rate time interval* @param rateIntervalUnit - rate time interval unit* @return {@code true} if rate was set and {@code false}* otherwise*/RFuture<Boolean> trySetRateAsync(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);/*** Acquires a permit only if one is available at the* time of invocation.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then this method will return* immediately with the value {@code false}.** @return {@code true} if a permit was acquired and {@code false}* otherwise*/RFuture<Boolean> tryAcquireAsync();/*** Acquires the given number of <code>permits</code> only if all are available at the* time of invocation.** <p>Acquires a permits, if all are available and returns immediately,* with the value {@code true},* reducing the number of available permits by given number of permits.** <p>If no permits are available then this method will return* immediately with the value {@code false}.** @param permits the number of permits to acquire* @return {@code true} if a permit was acquired and {@code false}* otherwise*/RFuture<Boolean> tryAcquireAsync(long permits);/*** Acquires a permit from this RateLimiter, blocking until one is available.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.* * @return void*/RFuture<Void> acquireAsync();/*** Acquires a specified <code>permits</code> from this RateLimiter, * blocking until one is available.** <p>Acquires the given number of permits, if they are available * and returns immediately, reducing the number of available permits * by the given amount.* * @param permits the number of permits to acquire* @return void*/RFuture<Void> acquireAsync(long permits);/*** Acquires a permit from this RateLimiter, if one becomes available* within the given waiting time.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* specified waiting time elapses.** <p>If a permit is acquired then the value {@code true} is returned.** <p>If the specified waiting time elapses then the value {@code false}* is returned. If the time is less than or equal to zero, the method* will not wait at all.** @param timeout the maximum time to wait for a permit* @param unit the time unit of the {@code timeout} argument* @return {@code true} if a permit was acquired and {@code false}* if the waiting time elapsed before a permit was acquired*/RFuture<Boolean> tryAcquireAsync(long timeout, TimeUnit unit);/*** Acquires the given number of <code>permits</code> only if all are available* within the given waiting time.** <p>Acquires the given number of permits, if all are available and returns immediately,* with the value {@code true}, reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* the specified waiting time elapses.** <p>If a permits is acquired then the value {@code true} is returned.** <p>If the specified waiting time elapses then the value {@code false}* is returned. If the time is less than or equal to zero, the method* will not wait at all.** @param permits amount* @param timeout the maximum time to wait for a permit* @param unit the time unit of the {@code timeout} argument* @return {@code true} if a permit was acquired and {@code false}* if the waiting time elapsed before a permit was acquired*/RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit);/*** Updates RateLimiter's state and stores config to Redis server.*** @param mode - rate mode* @param rate - rate* @param rateInterval - rate time interval* @param rateIntervalUnit - rate time interval unit* @return {@code true} if rate was set and {@code false}* otherwise*/RFuture<Void> setRateAsync(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);/*** Returns current configuration of this RateLimiter object.* * @return config object*/RFuture<RateLimiterConfig> getConfigAsync();/*** Returns amount of available permits.** @return number of permits*/RFuture<Long> availablePermitsAsync();}
RRateLimiterAsync继承了RExpirableAsync,它是async版本的RRateLimiter,它主要定义了trySetRateAsync、setRateAsync、tryAcquireAsync、acquireAsync、getConfigAsync、availablePermitsAsync方法
RedissonRateLimiter
redisson/src/main/java/org/redisson/RedissonRateLimiter.java
public class RedissonRateLimiter extends RedissonExpirable implements RRateLimiter {//......@Overridepublic boolean tryAcquire() {return tryAcquire(1);}@Overridepublic RFuture<Boolean> tryAcquireAsync() {return tryAcquireAsync(1L);}@Overridepublic boolean tryAcquire(long permits) {return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits));}@Overridepublic RFuture<Boolean> tryAcquireAsync(long permits) {return tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits);}@Overridepublic void acquire() {get(acquireAsync());}@Overridepublic RFuture<Void> acquireAsync() {return acquireAsync(1);}@Overridepublic void acquire(long permits) {get(acquireAsync(permits));}@Overridepublic RFuture<Void> acquireAsync(long permits) {CompletionStage<Void> f = tryAcquireAsync(permits, -1, null).thenApply(res -> null);return new CompletableFutureWrapper<>(f);}@Overridepublic boolean tryAcquire(long timeout, TimeUnit unit) {return get(tryAcquireAsync(timeout, unit));}@Overridepublic RFuture<Boolean> tryAcquireAsync(long timeout, TimeUnit unit) {return tryAcquireAsync(1, timeout, unit);}@Overridepublic boolean tryAcquire(long permits, long timeout, TimeUnit unit) {return get(tryAcquireAsync(permits, timeout, unit));}
}
RedissonRateLimiter继承了RedissonExpirable,实现了RRateLimiter接口
trySetRate
public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {return get(trySetRateAsync(type, rate, rateInterval, unit));}public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",Collections.singletonList(getRawName()), rate, unit.toMillis(rateInterval), type.ordinal());}
trySetRate委托给了trySetRateAsync,这里主要是使用hsetnx来设置rate、interval、type三个值
setRate
public void setRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {get(setRateAsync(type, rate, rateInterval, unit));}public RFuture<Void> setRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"local valueName = KEYS[2];"+ "local permitsName = KEYS[4];"+ "if ARGV[3] == '1' then "+ " valueName = KEYS[3];"+ " permitsName = KEYS[5];"+ "end "+"redis.call('hset', KEYS[1], 'rate', ARGV[1]);"+ "redis.call('hset', KEYS[1], 'interval', ARGV[2]);"+ "redis.call('hset', KEYS[1], 'type', ARGV[3]);"+ "redis.call('del', valueName, permitsName);",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()), rate, unit.toMillis(rateInterval), type.ordinal());}
setRate委托给了setRateAsync,这里使用hset来写入rate、interval、type三个值,如果存在则覆盖;另外这里删除了valueName、permitsName这两个key
tryAcquire
public boolean tryAcquire(long permits) {return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits));}private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {byte[] random = getServiceManager().generateIdArray();return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"local rate = redis.call('hget', KEYS[1], 'rate');"+ "local interval = redis.call('hget', KEYS[1], 'interval');"+ "local type = redis.call('hget', KEYS[1], 'type');"+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"+ "local valueName = KEYS[2];"+ "local permitsName = KEYS[4];"+ "if type == '1' then "+ "valueName = KEYS[3];"+ "permitsName = KEYS[5];"+ "end;"+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "+ "local currentValue = redis.call('get', valueName); "+ "local res;"+ "if currentValue ~= false then "+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "local released = 0; "+ "for i, v in ipairs(expiredValues) do "+ "local random, permits = struct.unpack('Bc0I', v);"+ "released = released + permits;"+ "end; "+ "if released > 0 then "+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "if tonumber(currentValue) + released > tonumber(rate) then "+ "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "+ "else "+ "currentValue = tonumber(currentValue) + released; "+ "end; "+ "redis.call('set', valueName, currentValue);"+ "end;"+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"+ "else "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end; "+ "else "+ "redis.call('set', valueName, rate); "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end;"+ "local ttl = redis.call('pttl', KEYS[1]); "+ "if ttl > 0 then "+ "redis.call('pexpire', valueName, ttl); "+ "redis.call('pexpire', permitsName, ttl); "+ "end; "+ "return res;",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),value, System.currentTimeMillis(), random);}
tryAcquire委托给了tryAcquireAsync,它通过一个lua脚本来执行,首先通过hget获取rate、interval、type的值,然后根据type来确定valueName、permitsName,如果type为0则valueName是getValueName(),permitsName是getPermitsName(),如果type=1则valueName是getClientValueName(),permitsName是getClientPermitsName();之后获取valueName的值,若为false则直接用设置rate、permits,并递减valueName;若为true则获取expiredValues计算released值,再计算出currentValue,若不够扣则计算返回值,若够扣则通过zadd添加当前permit(
System.currentTimeMillis()
),然后递减valueName
acquire
public void acquire() {get(acquireAsync());}public RFuture<Void> acquireAsync() {return acquireAsync(1);}public RFuture<Void> acquireAsync(long permits) {CompletionStage<Void> f = tryAcquireAsync(permits, -1, null).thenApply(res -> null);return new CompletableFutureWrapper<>(f);}public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {long timeoutInMillis = -1;if (timeout >= 0) {timeoutInMillis = unit.toMillis(timeout);}CompletableFuture<Boolean> f = tryAcquireAsync(permits, timeoutInMillis);return new CompletableFutureWrapper<>(f);}private CompletableFuture<Boolean> tryAcquireAsync(long permits, long timeoutInMillis) {long s = System.currentTimeMillis();RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);return future.thenCompose(delay -> {if (delay == null) {return CompletableFuture.completedFuture(true);}if (timeoutInMillis == -1) {CompletableFuture<Boolean> f = new CompletableFuture<>();getServiceManager().getGroup().schedule(() -> {CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis);commandExecutor.transfer(r, f);}, delay, TimeUnit.MILLISECONDS);return f;}long el = System.currentTimeMillis() - s;long remains = timeoutInMillis - el;if (remains <= 0) {return CompletableFuture.completedFuture(false);}CompletableFuture<Boolean> f = new CompletableFuture<>();if (remains < delay) {getServiceManager().getGroup().schedule(() -> {f.complete(false);}, remains, TimeUnit.MILLISECONDS);} else {long start = System.currentTimeMillis();getServiceManager().getGroup().schedule(() -> {long elapsed = System.currentTimeMillis() - start;if (remains <= elapsed) {f.complete(false);return;}CompletableFuture<Boolean> r = tryAcquireAsync(permits, remains - elapsed);commandExecutor.transfer(r, f);}, delay, TimeUnit.MILLISECONDS);}return f;}).toCompletableFuture();}
acquire也是复用了tryAcquireAsync方法,只获取不到时会根据返回的delay进行重新调度,若timeoutInMillis不为-1则会根据超时时间进行计算和重新调度
availablePermits
public long availablePermits() {return get(availablePermitsAsync());}public RFuture<Long> availablePermitsAsync() {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"local rate = redis.call('hget', KEYS[1], 'rate');"+ "local interval = redis.call('hget', KEYS[1], 'interval');"+ "local type = redis.call('hget', KEYS[1], 'type');"+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"+ "local valueName = KEYS[2];"+ "local permitsName = KEYS[4];"+ "if type == '1' then "+ "valueName = KEYS[3];"+ "permitsName = KEYS[5];"+ "end;"+ "local currentValue = redis.call('get', valueName); "+ "if currentValue == false then "+ "redis.call('set', valueName, rate); "+ "return rate; "+ "else "+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[1]) - interval); "+ "local released = 0; "+ "for i, v in ipairs(expiredValues) do "+ "local random, permits = struct.unpack('Bc0I', v);"+ "released = released + permits;"+ "end; "+ "if released > 0 then "+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[1]) - interval); "+ "currentValue = tonumber(currentValue) + released; "+ "redis.call('set', valueName, currentValue);"+ "end;"+ "return currentValue; "+ "end;",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),System.currentTimeMillis());}
availablePermits委托给了availablePermitsAsync,它执行lua脚本,先通过hget获取rate、interval、type的值,然后根据type来确定valueName、permitsName,如果type为0则valueName是getValueName(),permitsName是getPermitsName(),如果type=1则valueName是getClientValueName(),permitsName是getClientPermitsName();之后获取valueName对应的值currentValue,若值为false则重新设置rate,否则通过expiredValues重新计算released,若released大于0则更新到currentValue,最后返回currentValue
小结
redisson的RRateLimiter提供了trySetRate、setRate、tryAcquire、acquire、getConfig、availablePermits方法
- 其RateType有OVERALL(
值为0
)、PER_CLIENT(值为1
)两个类型,如果type为0则valueName是getValueName(),permitsName是getPermitsName(),如果type=1则valueName是getClientValueName(),permitsName是getClientPermitsName() - 它主要定义了几个key,一个是getRawName,类型为hash,其key有rate、interval、type;一个是key为valueName,存储了当前的permits;一个是key为permitsName,类型是sorted set,其score为System.currentTimeMillis(),value通过struct.pack了随机数长度、随机数、此次permit的value
- trySetRate委托给了trySetRateAsync,这里主要是使用hsetnx来设置rate、interval、type三个值;setRate委托给了setRateAsync,这里使用hset来写入rate、interval、type三个值,如果存在则覆盖;另外这里删除了valueName、permitsName这两个key
- tryAcquire委托给了tryAcquireAsync,它通过一个lua脚本来执行,首先通过hget获取rate、interval、type的值,之后获取valueName的值,若为false则直接用设置rate、permits,并递减valueName;若为true则获取expiredValues计算released值,再计算出currentValue,若不够扣则计算返回值(告诉调用方可以延时多长时间再重试),若够扣则通过zadd添加当前permit(
System.currentTimeMillis()
),然后递减valueName - acquire是也是复用了tryAcquireAsync方法,只获取不到时会根据返回的delay进行重新调度,若timeoutInMillis不为-1则会根据超时时间进行计算和重新调度
这篇关于聊聊redisson的RRateLimiter的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!