
如何在 Java 中通过 Bucket4j 提供速率限制?

如何通过 Bucket4j 为您的项目提供基于令牌桶算法的速率限制?

时不时地,我们所有人都面临着限制我们的外部 API 的问题——出于多种原因,我们应该限制对我们 API 的调用的某些功能。



  1. 欺诈检测(保护机器人):例如,我们有一个论坛,当有人试图发送消息或发布超出限制的帖子时,我们希望防止来自客户的垃圾邮件。为了我们自己的安全,我们必须防止这种行为。

  2. 从业务逻辑来看,通常是用来实现“API业务模型”的:比如我们需要为我们的外部API引入关税功能,我们要创建一些关税,例如START、STANDARD、BUSINESS . 对于每种资费,我们设置了每小时的通话次数限制(但您也可以将通话次数限制为每分钟、每秒、毫秒一次,也可以设置为每分钟一次。此外,您甚至可以设置多个限制限制 – 这称为“带宽管理”)。

  • START – 每小时最多 100 个电话

  • 标准 – 每小时最多 10000 个

  • 商业 – 高达每小时 100000




  • 令牌桶

  • 漏桶


  • 固定窗口计数器

  • 滑动窗口日志

  • 滑动窗口计数器





  • Bucket:如你所见,他的token数量是固定的(如果你在我们的bucket中设置了1000个token,这就是volume的最大值)。

  • Refiller:根据带宽管理定期将丢失的token填充到Bucket(每次Consume前调用)。

  • Consume:从我们的Bucket中取出tokens(取出1个token或多个token——通常取决于调用consume方法的权重,它是一个可定制且灵活的变量,但在99%的情况下,我们只需要消费一个令牌)。




Token Bucket 算法有固定的内存用于存储 Bucket,它由以下变量组成:

  • Bucket 的体积(最大可能的令牌数) – 8 个字节

  • 桶中令牌的当前计数 – 8 个字节

  • 生成新令牌的纳秒计数 – 8 个字节

  • 对象头:16字节

总共:40 字节

例如,在 1 GB 中,我们可以存储 2500 万个桶。了解这一点非常重要,因为通常我们会将有关存储桶的信息存储在缓存中,然后存储到 RAM(随机存取存储器)中。



不幸的是,该算法并不完美。Token Bucket 算法的主要问题被称为“Burst”。


  1. 在某些时候,我们的存储桶包含 100 个令牌。

  2. 同时,我们消耗了 100 个代币。

  3. 一秒钟后,填充器再次填充 100 个令牌。

  4. 同时,我们消耗了 100 个代币。

大约 1 秒,我们消耗了 200 个代币,因此,我们超过了限制 x2 倍!

但是,有问题吗?没有!如果我们要使用 Bucket 进行长期距离,问题就不是问题了。

如果我们只使用我们的 Bucket 1 秒,我们会过度消耗令牌 x2 倍(200 个令牌),但是如果我们使用我们的 Bucket 60 秒,则该桶的消耗大约等于 6100 秒,因为 Burst 问题只发生了一次. 您使用铲斗的次数越多,其准确性就越好。当准确性在速率限制中很重要时,这是一种非常罕见的情况。 

最重要的是消耗内存,因为我们有一个与“Burst”相关的问题。一个bucket有固定内存大小的要求(在Token Bucket算法的情况下 – 40字节),我们面临“Burst”的问题,因为要创建Bucket 我们需要 2 个变量:生成新令牌的纳秒计数(refill)和 bucket 的体积(容量)——因此,我们无法实现 Token Bucket 的准确性合约


通过 Bucket4j 实现 Rate-Limiter

让我们考虑一下 Bucket4j 库实现的 Token Bucket 算法。

Bucket4j 是 Java 世界中用于实现速率限制功能的最流行的库。每个月,Bucket4j 从 Maven Central 下载多达 200,000 次,并包含在 GitHub 上的 3500 个依赖项中。

让我们考虑几个简单的例子(我们将使用 Maven 作为软件项目管理和理解工具)。

对于第一个,我们需要在 pom.xml 中添加一个依赖项:


创建 Example.java:

import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.ConsumptionProbe;
import java.time.Duration;

public class Example {

   public static void main(String args[]) {

       //Create the Bandwidth to set the rule - one token per minute
       Bandwidth oneCosumePerMinuteLimit = Bandwidth.simple(1, Duration.ofMinutes(1));

       //Create the Bucket and set the Bandwidth which we created above
       Bucket bucket = Bucket.builder()

       //Call method tryConsume to set count of Tokens to take from the Bucket,
       //returns boolean, if true - consume successful and the Bucket had enough Tokens inside Bucket to execute method tryConsume
       System.out.println(bucket.tryConsume(1)); //return true

       //Call method tryConsumeAndReturnRemaining and set count of Tokens to take from the Bucket
       //Returns ConsumptionProbe, which include much more information than tryConsume, such as the
       //isConsumed - is method consume successful performed or not, if true - is successful
       //getRemainingTokens - count of remaining Tokens
       //getNanosToWaitForRefill - Time in nanoseconds to refill Tokens in our Bucket
       ConsumptionProbe consumptionProbe = bucket.tryConsumeAndReturnRemaining(1);
       System.out.println(consumptionProbe.isConsumed()); //return false since we have already called method tryConsume, but Bandwidth has a limit with rule - one token per one minute
       System.out.println(consumptionProbe.getRemainingTokens()); //return 0, since we have already consumed all of the Tokens
       System.out.println(consumptionProbe.getNanosToWaitForRefill()); //Return around 60000000000 nanoseconds


让我们考虑一个更困难的例子。让我们想象一种情况,您需要考虑通过对某个 RESTful API 方法的请求计数来限制(需要通过来自某个用户对某个控制器的请求调用计数来限制,每个 Y 周期不超过 X 次)。但是,我们的系统是分布式的,我们在一个集群中有很多笔记;我们使用 Hazelcast(但它可以是任何 JSR107 缓存、DynamoDB、Redis 或其他东西)。

让我们基于 Spring 框架来实现我们的示例。

首先,我们需要在 pom.xml 中添加一些依赖项:






public @interface RateLimiter {

   TimeUnit timeUnit() default TimeUnit.MINUTES;

   long timeValue();

   long restriction();


此外,注释将分组 RateLimiter 注释(如果我们需要为每个控制器使用多个带宽)。

import java.lang.annotation.*;

public @interface RateLimiters {

   RateLimiter[] value();



public enum TimeUnit {

而且,现在,我们需要创建一个类,它将进行注释处理。由于将在控制器级别设置注释,因此该类应从 HandlerInterceptorAdapter 扩展:

public class RateLimiterAnnotationHandlerInterceptorAdapter extends HandlerInterceptorAdapter {

   //You should have already realized class, which returns Authentication context to getting userId
   private AuthenticationUtil authenticationUtil;

   private final ProxyManager<RateLimiterKey> proxyManager;

   public RateLimiterAnnotationHandlerInterceptorAdapter(AuthenticationUtil authenticationUtil, HazelcastInstance hazelcastInstance) {
       this.authenticationUtil = authenticationUtil;

       //To start work with Hazelcast, you also should create HazelcastInstance bean 
       IMap<RateLimiterKey, byte[]> bucketsMap = hazelcastInstance.getMap(HazelcastFrontConfiguration.RATE_LIMITER_BUCKET);
       proxyManager = new HazelcastProxyManager<>(bucketsMap);

   public boolean preHandle(HttpServletRequest request,
                            HttpServletResponse response,
                            Object handler)
 throws Exception

       if (handler instanceof HandlerMethod) {
           HandlerMethod handlerMethod = (HandlerMethod) handler;

           //if into handlerMethod is present RateLimiter or RateLimiters annotation, we get it, if not, we get empty Optional 
           Optional<List<RateLimiter>> rateLimiters = RateLimiterUtils.getRateLimiters(handlerMethod);

           if (rateLimiters.isPresent()) {
               //Get path from RequestMapping annotation(respectively we can get annotations such: GetMapping, PostMapping, PutMapping, DeleteMapping, because all of than annotations are extended from RequestMapping)
               RequestMapping requestMapping = handlerMethod.getMethodAnnotation(RequestMapping.class);

               //To get unique key we use bundle of 2-x values: path from RequestMapping and user id
               RateLimiterKey key = new RateLimiterKey(authenticationUtil.getPersonId(), requestMapping.value());

               //Further we set key in proxy to get Bucket from cache or create a new Bucket
               Bucket bucket = proxyManager.builder().build(key, () -> RateLimiterUtils.rateLimiterAnnotationsToBucketConfiguration(rateLimiters.get()));

               //Try to consume token, if we don’t do that, we return 429 HTTP code
               if (!bucket.tryConsume(1)) {
                   return false;
       return true;

要使用 Hazelcast,我们需要创建一个必须可序列化的自定义键:

public class RateLimiterKey implements Serializable {
   private String userId;
   private String[] uri;

此外,不要忘记名为 RateLimiterUtils 的特殊实用程序类,用于与 RateLimiterAnnotationHandlerInterceptorAdapter 一起工作(Spring 名称约定样式 – 将您的类或方法命名为必须易于理解,即使以您的名义包含 10 个单词。这是我的目标风格)。

public final class RateLimiterUtils {

   public static BucketConfiguration rateLimiterAnnotationsToBucketConfiguration(List<RateLimiter> rateLimiters) {
       ConfigurationBuilder configBuilder = Bucket4j.configurationBuilder();
       rateLimiters.stream().forEach(limiter -> configBuilder.addLimit(buildBandwidth(limiter)));
       return configBuilder.build();


   public static Optional<List<RateLimiter>> getRateLimiters(HandlerMethod handlerMethod) {
       RateLimiters rateLimitersAnnotation = handlerMethod.getMethodAnnotation(RateLimiters.class);

       if(rateLimitersAnnotation != null) {
           return Optional.of(Arrays.asList(rateLimitersAnnotation.value()));

       RateLimiter rateLimiterAnnotation = handlerMethod.getMethodAnnotation(RateLimiter.class);
       if(rateLimiterAnnotation != null) {
           return Optional.of(Arrays.asList(rateLimiterAnnotation));
       return Optional.empty();

   private static final Bandwidth buildBandwidth(RateLimiter rateLimiter) {

       TimeUnit timeUnit = rateLimiter.timeUnit();
       long timeValue = rateLimiter.timeValue();
       long restriction = rateLimiter.restriction();

       if (TimeUnit.MINUTES.equals(timeUnit)) {
           return Bandwidth.simple(restriction, Duration.ofMinutes(timeValue));
       } else if (TimeUnit.HOURS.equals(timeUnit)) {
           return Bandwidth.simple(restriction, Duration.ofHours(timeValue));
       } else {
           return Bandwidth.simple(5000, Duration.ofHours(1));

还有一件事; 我们需要在扩展自 WebMvcConfigurerAdapter 的 Context 中注册我们的自定义拦截器:

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;

public class ContextConfig extends WebMvcConfigurerAdapter {

   public void addInterceptors(InterceptorRegistry registry) {
       registry.addInterceptor(new RateLimiterAnnotationHandlerInterceptorAdapter());

现在,为了测试我们的机制,我们将创建 ExampleController 并在控制器的方法上方设置 RateLimiter 以检查它是否正常工作:

import com.nibado.example.customargumentspring.component.RateLimiter;
import com.nibado.example.customargumentspring.component.RateLimiters;
import com.nibado.example.customargumentspring.component.TimeUnit;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

public class ExampleController {

   @RateLimiters({@RateLimiter(timeUnit = TimeUnit.MINUTES, timeValue = 1, restriction = 2), @RateLimiter(timeUnit = TimeUnit.HOURS, timeValue = 1, restriction = 5)})
   public String example(@PathVariable("id") String id) {
       return "ok";

在@RateLimiters 中,我们设置了两个限制:

  • @RateLimiter(timeUnit = TimeUnit.MINUTES, timeValue = 1,restriction = 2) — 每分钟不超过 2 个请求。

  • @RateLimiter(timeUnit = TimeUnit.HOURS, timeValue = 1,restriction = 5) — 每小时不超过 5 个请求。

这只是 Bucket4j 库的一小部分。如果你觉得这个库不错的话,可以去学习更多API。






