はじめに

「APIが落ちた。原因は1人のユーザーが秒間1000リクエスト投げてきたから」

これ、笑い話じゃない。実際に起きる。

悪意がなくても起きる。

  • バグで無限ループになったクライアント
  • リトライを連打する実装ミス
  • スクレイピングボット

Rate Limitingは、システムを守る最後の防衛線だ。

この記事では、Rate Limitingの本質から、アルゴリズム、実装、分散環境での運用までを解説する。


なぜRate Limitingが必要か

守るべきもの

対象 脅威
サーバーリソース 過負荷によるダウン
データベース 大量クエリによる性能低下
外部API 課金上限、利用規約違反
他のユーザー 一部ユーザーによるリソース独占

Rate Limitingの効果

    graph TB
    subgraph before["Rate Limiting なし"]
        B1["User A<br/>1000 req/s"]
        B2["サーバー処理"]
        B3["DB過負荷"]
        B4["全員に影響"]

        B1 --> B2 --> B3 --> B4
    end

    subgraph after["Rate Limiting あり"]
        A1["User A<br/>1000 req/s"]
        A2["Rate Limiter<br/>(10 req/s まで)"]
        A3["サーバー処理<br/>10 req/s"]
        A4["990 req/s<br/>429 Too Many Requests"]

        A1 --> A2
        A2 -->|"10 req/s"| A3
        A2 -->|"990 req/s"| A4
    end

    style before fill:#ffebee,stroke:#f44336,stroke-width:2px
    style after fill:#e8f5e9,stroke:#4caf50,stroke-width:2px
    style B3 fill:#ffcdd2,stroke:#c62828
    style B4 fill:#ffcdd2,stroke:#c62828
    style A3 fill:#c8e6c9,stroke:#2e7d32
    style A4 fill:#fff3e0,stroke:#f57c00
  

Rate Limitingアルゴリズム

1. 固定ウィンドウ(Fixed Window)

    gantt
    title 固定ウィンドウ (1分ウィンドウ、100リクエストまで)
    dateFormat mm:ss
    axisFormat %M:%S

    section Window 1
    100 req まで OK :w1, 00:00, 60s

    section Window 2
    100 req まで OK :w2, 01:00, 60s

    section Window 3
    100 req まで OK :w3, 02:00, 60s
  

実装:

    import time

class FixedWindowRateLimiter:
    def __init__(self, limit, window_size):
        self.limit = limit          # 100
        self.window_size = window_size  # 60秒
        self.counters = {}  # {user_id: (window_start, count)}

    def is_allowed(self, user_id):
        now = time.time()
        window_start = int(now // self.window_size) * self.window_size

        if user_id not in self.counters:
            self.counters[user_id] = (window_start, 0)

        stored_window, count = self.counters[user_id]

        # 新しいウィンドウなら、カウントをリセット
        if stored_window != window_start:
            self.counters[user_id] = (window_start, 1)
            return True

        # 制限内なら許可
        if count < self.limit:
            self.counters[user_id] = (window_start, count + 1)
            return True

        return False
  

メリット:

  • 実装がシンプル
  • メモリ効率が良い

デメリット:

  • バースト問題: ウィンドウの境界で2倍のリクエストが通る
    gantt
    title Window境界でのバースト問題
    dateFormat mm:ss
    axisFormat %M:%S

    section Window 1
    リクエスト受付中 :w1, 00:00, 59s
    100 req到達 :crit, burst1, 00:59, 1s

    section Window 2
    100 req到達 :crit, burst2, 01:00, 1s
    リクエスト受付中 :w2, 01:01, 59s

    section 問題
    1秒間に200 req :milestone, critical, 00:59, 2s
  

2. スライディングウィンドウログ(Sliding Window Log)

    現在時刻: 1:30

過去1分間のリクエストをカウント
[1:00, 1:05, 1:10, 1:15, 1:20, 1:25, 1:29]
↑ 0:30 より前は除外

|<-------- 1分間 -------->|
0:30                   1:30 (現在)
     ↑ この範囲のリクエストをカウント
  

実装:

    import time
from collections import deque

class SlidingWindowLogRateLimiter:
    def __init__(self, limit, window_size):
        self.limit = limit          # 100
        self.window_size = window_size  # 60秒
        self.logs = {}  # {user_id: deque([timestamp, ...])}

    def is_allowed(self, user_id):
        now = time.time()
        window_start = now - self.window_size

        if user_id not in self.logs:
            self.logs[user_id] = deque()

        log = self.logs[user_id]

        # 古いログを削除
        while log and log[0] < window_start:
            log.popleft()

        # 制限内なら許可
        if len(log) < self.limit:
            log.append(now)
            return True

        return False
  

メリット:

  • 正確(バースト問題なし)

デメリット:

  • メモリ使用量が多い(全リクエストのタイムスタンプを保持)

3. スライディングウィンドウカウンター(Sliding Window Counter)

固定ウィンドウとスライディングログの中間。

    現在時刻: 1:30 (ウィンドウの50%地点)

前のウィンドウ (0:00-0:59): 80 req
現在のウィンドウ (1:00-1:59): 30 req

推定カウント = 80 × 0.5 + 30 × 1.0 = 40 + 30 = 70 req
  

実装:

    import time

class SlidingWindowCounterRateLimiter:
    def __init__(self, limit, window_size):
        self.limit = limit
        self.window_size = window_size
        self.counters = {}  # {user_id: {prev_window: count, curr_window: count}}

    def is_allowed(self, user_id):
        now = time.time()
        curr_window = int(now // self.window_size)
        prev_window = curr_window - 1

        # ウィンドウ内の位置(0.0 ~ 1.0)
        window_progress = (now % self.window_size) / self.window_size

        if user_id not in self.counters:
            self.counters[user_id] = {}

        counter = self.counters[user_id]
        prev_count = counter.get(prev_window, 0)
        curr_count = counter.get(curr_window, 0)

        # 加重平均でカウント推定
        estimated_count = prev_count * (1 - window_progress) + curr_count

        if estimated_count < self.limit:
            counter[curr_window] = curr_count + 1
            # 古いウィンドウを削除
            counter.pop(prev_window - 1, None)
            return True

        return False
  

メリット:

  • メモリ効率が良い(2ウィンドウ分のカウンターのみ)
  • バースト問題を軽減

デメリット:

  • 近似値(100%正確ではない)

4. トークンバケット(Token Bucket)

    graph TB
    subgraph bucket["トークンバケット"]
        direction TB
        State["現在: 7トークン"]
        Capacity["容量: 10トークン"]
        Refill["補充レート: 1トークン/秒"]
    end

    Request["リクエスト"]
    Check{トークンあり?}
    Allow["許可<br/>(トークン-1)"]
    Deny["拒否<br/>(429)"]
    Time["時間経過<br/>(1秒)"]
    Add["トークン+1<br/>(最大10まで)"]

    Request --> Check
    Check -->|"Yes (7 → 6)"| Allow
    Check -->|"No (0)"| Deny
    Time --> Add --> bucket

    style bucket fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style Allow fill:#c8e6c9,stroke:#2e7d32
    style Deny fill:#ffcdd2,stroke:#c62828
    style Add fill:#fff3e0,stroke:#f57c00
  

実装:

    import time

class TokenBucketRateLimiter:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity      # バケット容量: 10
        self.refill_rate = refill_rate  # 補充レート: 1 token/秒
        self.buckets = {}  # {user_id: (tokens, last_update)}

    def is_allowed(self, user_id):
        now = time.time()

        if user_id not in self.buckets:
            self.buckets[user_id] = (self.capacity, now)

        tokens, last_update = self.buckets[user_id]

        # 経過時間に応じてトークン補充
        elapsed = now - last_update
        tokens = min(self.capacity, tokens + elapsed * self.refill_rate)

        if tokens >= 1:
            self.buckets[user_id] = (tokens - 1, now)
            return True

        self.buckets[user_id] = (tokens, now)
        return False
  

メリット:

  • バーストを許容(溜まったトークン分だけ)
  • 平均レートを制御しつつ、一時的なスパイクを許容

デメリット:

  • パラメータ調整が必要(容量とレート)

ユースケース:

  • 一時的なバーストを許容したい場合
  • AWS API Gateway のデフォルト

5. リーキーバケット(Leaky Bucket)

    graph TB
    Request["新しいリクエスト"]
    Check{バケットに空き?}
    Queue["バケットに追加<br/>(7リクエスト待機中)"]
    Deny["拒否<br/>(429)"]
    Process["1 req/秒で<br/>一定レートで処理"]

    subgraph bucket["リーキーバケット"]
        direction TB
        Waiting["待機中: 7リクエスト"]
        Capacity["容量: 10リクエスト"]
        LeakRate["流出レート: 1 req/秒"]
    end

    Request --> Check
    Check -->|"Yes (< 10)"| Queue
    Check -->|"No (= 10)"| Deny
    Queue --> bucket
    bucket --> Process

    style bucket fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style Queue fill:#fff3e0,stroke:#f57c00
    style Deny fill:#ffcdd2,stroke:#c62828
    style Process fill:#c8e6c9,stroke:#2e7d32
  

実装:

    import time
from collections import deque

class LeakyBucketRateLimiter:
    def __init__(self, capacity, leak_rate):
        self.capacity = capacity      # バケット容量: 10
        self.leak_rate = leak_rate    # 流出レート: 1 req/秒
        self.buckets = {}  # {user_id: (queue, last_leak)}

    def is_allowed(self, user_id):
        now = time.time()

        if user_id not in self.buckets:
            self.buckets[user_id] = (deque(), now)

        queue, last_leak = self.buckets[user_id]

        # 経過時間に応じてバケットから流出
        elapsed = now - last_leak
        leak_count = int(elapsed * self.leak_rate)
        for _ in range(min(leak_count, len(queue))):
            queue.popleft()

        # バケットに空きがあれば追加
        if len(queue) < self.capacity:
            queue.append(now)
            self.buckets[user_id] = (queue, now)
            return True

        self.buckets[user_id] = (queue, now)
        return False
  

メリット:

  • 出力レートが一定(スムーズ)
  • バースト時のシステム負荷を平準化

デメリット:

  • バーストが即座に処理されない(キューで待機)

ユースケース:

  • 一定レートで処理したい場合(バッチ処理への投入など)

アルゴリズム比較

アルゴリズム バースト許容 メモリ効率 実装難易度 正確性
固定ウィンドウ ❌ 境界で2倍 ✅ 良い ✅ 簡単 ⚪ 普通
スライディングログ ✅ なし ❌ 悪い ⚪ 普通 ✅ 高い
スライディングカウンター ⚪ 軽減 ✅ 良い ⚪ 普通 ⚪ 近似
トークンバケット ✅ 制御可能 ✅ 良い ⚪ 普通 ⚪ 普通
リーキーバケット ❌ キュー待機 ⚪ 普通 ⚪ 普通 ✅ 高い

【実装】Redisでの実装

固定ウィンドウ(INCR + EXPIRE)

    import redis
import time

class RedisFixedWindowRateLimiter:
    def __init__(self, redis_client, limit, window_size):
        self.redis = redis_client
        self.limit = limit
        self.window_size = window_size

    def is_allowed(self, user_id):
        window = int(time.time() // self.window_size)
        key = f"ratelimit:{user_id}:{window}"

        # INCR + EXPIRE をアトミックに
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, self.window_size)
        results = pipe.execute()

        count = results[0]
        return count <= self.limit

# 使用例
redis_client = redis.Redis()
limiter = RedisFixedWindowRateLimiter(redis_client, limit=100, window_size=60)

if limiter.is_allowed('user123'):
    # 処理実行
    pass
else:
    # 429 Too Many Requests
    pass
  

スライディングウィンドウログ(Sorted Set)

    import redis
import time
import uuid

class RedisSlidingWindowLogRateLimiter:
    def __init__(self, redis_client, limit, window_size):
        self.redis = redis_client
        self.limit = limit
        self.window_size = window_size

    def is_allowed(self, user_id):
        now = time.time()
        window_start = now - self.window_size
        key = f"ratelimit:{user_id}"

        pipe = self.redis.pipeline()
        # 古いエントリを削除
        pipe.zremrangebyscore(key, 0, window_start)
        # 現在のカウント取得
        pipe.zcard(key)
        # 新しいエントリ追加
        pipe.zadd(key, {f"{now}:{uuid.uuid4()}": now})
        # TTL設定
        pipe.expire(key, self.window_size)
        results = pipe.execute()

        count = results[1]
        return count < self.limit  # 追加前のカウントで判断
  

トークンバケット(Lua Script)

    import redis
import time

TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local data = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(data[1]) or capacity
local last_update = tonumber(data[2]) or now

-- トークン補充
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * refill_rate)

-- トークン消費
if tokens >= requested then
    tokens = tokens - requested
    redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
    redis.call('EXPIRE', key, 3600)
    return 1
else
    redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
    redis.call('EXPIRE', key, 3600)
    return 0
end
"""

class RedisTokenBucketRateLimiter:
    def __init__(self, redis_client, capacity, refill_rate):
        self.redis = redis_client
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.script = self.redis.register_script(TOKEN_BUCKET_SCRIPT)

    def is_allowed(self, user_id, tokens=1):
        key = f"ratelimit:{user_id}"
        now = time.time()
        result = self.script(
            keys=[key],
            args=[self.capacity, self.refill_rate, now, tokens]
        )
        return result == 1
  

【実装】Nginxでの実装

limit_req_zone(リーキーバケット)

    http {
    # Zone定義: 10MBの共有メモリ、キーはIPアドレス
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

    server {
        location /api/ {
            # バースト許容: 20リクエストまでキューイング
            # nodelay: キューイングせず即座にレスポンス
            limit_req zone=api_limit burst=20 nodelay;

            # 429を返すステータスコード
            limit_req_status 429;

            proxy_pass http://backend;
        }
    }
}
  

limit_conn(同時接続数制限)

    http {
    limit_conn_zone $binary_remote_addr zone=conn_limit:10m;

    server {
        location /api/ {
            # IPあたり最大10接続
            limit_conn conn_limit 10;
            limit_conn_status 429;

            proxy_pass http://backend;
        }
    }
}
  

複合的な制限

    http {
    # IP単位の制限
    limit_req_zone $binary_remote_addr zone=ip_limit:10m rate=10r/s;

    # APIキー単位の制限
    map $http_x_api_key $api_key_limit_key {
        default $http_x_api_key;
        ""      $binary_remote_addr;
    }
    limit_req_zone $api_key_limit_key zone=apikey_limit:10m rate=100r/s;

    server {
        location /api/ {
            # 両方の制限を適用
            limit_req zone=ip_limit burst=20 nodelay;
            limit_req zone=apikey_limit burst=200 nodelay;

            proxy_pass http://backend;
        }
    }
}
  

【実装】アプリケーション層での実装

Python(Flask + Redis)

    from flask import Flask, request, jsonify
from functools import wraps
import redis

app = Flask(__name__)
redis_client = redis.Redis()

def rate_limit(limit, window):
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            # キー: IPアドレス + エンドポイント
            key = f"ratelimit:{request.remote_addr}:{request.endpoint}"

            # Redisでカウント
            pipe = redis_client.pipeline()
            pipe.incr(key)
            pipe.expire(key, window)
            results = pipe.execute()

            count = results[0]
            remaining = max(0, limit - count)

            # レスポンスヘッダー
            headers = {
                'X-RateLimit-Limit': str(limit),
                'X-RateLimit-Remaining': str(remaining),
                'X-RateLimit-Reset': str(int(time.time()) + window)
            }

            if count > limit:
                response = jsonify({'error': 'Too Many Requests'})
                response.status_code = 429
                response.headers.extend(headers)
                response.headers['Retry-After'] = str(window)
                return response

            response = f(*args, **kwargs)
            if isinstance(response, tuple):
                response, status = response
                response = jsonify(response)
                response.status_code = status
            response.headers.extend(headers)
            return response
        return wrapper
    return decorator

@app.route('/api/users')
@rate_limit(limit=100, window=60)
def get_users():
    return jsonify({'users': []})
  

Express.js

    const express = require('express');
const Redis = require('ioredis');

const app = express();
const redis = new Redis();

function rateLimit(options) {
  const { limit, window } = options;

  return async (req, res, next) => {
    const key = `ratelimit:${req.ip}:${req.path}`;

    const multi = redis.multi();
    multi.incr(key);
    multi.expire(key, window);
    const results = await multi.exec();

    const count = results[0][1];
    const remaining = Math.max(0, limit - count);

    res.set({
      'X-RateLimit-Limit': limit,
      'X-RateLimit-Remaining': remaining,
      'X-RateLimit-Reset': Math.floor(Date.now() / 1000) + window
    });

    if (count > limit) {
      res.set('Retry-After', window);
      return res.status(429).json({ error: 'Too Many Requests' });
    }

    next();
  };
}

app.get('/api/users', rateLimit({ limit: 100, window: 60 }), (req, res) => {
  res.json({ users: [] });
});
  

【実務】レスポンスヘッダーの設計

標準的なヘッダー

    HTTP/1.1 200 OK
X-RateLimit-Limit: 100           # 制限値
X-RateLimit-Remaining: 95        # 残りリクエスト数
X-RateLimit-Reset: 1702540800    # リセット時刻(Unix timestamp)

HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1702540800
Retry-After: 30                  # 何秒後にリトライすべきか
  

クライアント側の対応

    async function fetchWithRateLimit(url) {
  const response = await fetch(url);

  if (response.status === 429) {
    const retryAfter = response.headers.get('Retry-After');
    const delay = retryAfter ? parseInt(retryAfter) * 1000 : 60000;

    console.log(`Rate limited. Retrying after ${delay}ms`);
    await sleep(delay);
    return fetchWithRateLimit(url);
  }

  // 残りリクエスト数を確認
  const remaining = response.headers.get('X-RateLimit-Remaining');
  if (remaining && parseInt(remaining) < 10) {
    console.warn(`Rate limit warning: ${remaining} requests remaining`);
  }

  return response;
}
  

【実務】分散環境での課題

問題: 複数サーバーでのカウント

    graph TB
    LB["Load Balancer"]
    ServerA["Server A<br/>count: 50"]
    ServerB["Server B<br/>count: 50"]
    Problem["❌ 問題<br/>User の実際のリクエスト数: 100<br/>各サーバーが認識: 50<br/>制限が効いていない!"]

    LB --> ServerA
    LB --> ServerB
    ServerA --> Problem
    ServerB --> Problem

    style LB fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style ServerA fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style ServerB fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style Problem fill:#ffcdd2,stroke:#c62828,stroke-width:2px
  

解決策: 中央集権的なストア

    graph TB
    LB["Load Balancer"]
    ServerA["Server A"]
    ServerB["Server B"]
    Redis["Redis<br/>count: 100<br/>✅ 中央でカウント管理"]

    LB --> ServerA
    LB --> ServerB
    ServerA --> Redis
    ServerB --> Redis

    style LB fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style ServerA fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
    style ServerB fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
    style Redis fill:#fff3e0,stroke:#f57c00,stroke-width:3px
  

Redis Cluster での注意点

    # キーが同じスロットに配置されるようにする
# {} 内の文字列でハッシュスロットが決まる

key = f"ratelimit:{{user123}}:{window}"  # user123 でスロット決定
  

レース条件への対策

    # 問題のあるコード
count = redis.get(key)
if count < limit:
    redis.incr(key)  # ← この間に他のリクエストが来る可能性

# 解決策1: INCR の結果で判断
count = redis.incr(key)
if count == 1:
    redis.expire(key, window)
if count > limit:
    return False

# 解決策2: Lua Script(アトミック実行)
SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = redis.call('INCR', key)
if current == 1 then
    redis.call('EXPIRE', key, window)
end
return current <= limit and 1 or 0
"""
  

【実務】多層Rate Limiting

レイヤーごとの制限

    graph TB
    Client["クライアント"]
    L1["L1: CDN/WAF<br/>(Cloudflare, AWS WAF)<br/>IP単位、粗い制限<br/>例: 1000 req/min"]
    L2["L2: Load Balancer<br/>(Nginx)<br/>例: 100 req/s"]
    L3["L3: Application<br/>(Flask + Redis)<br/>ユーザー/API単位<br/>例: プランに応じた制限"]
    Backend["バックエンド処理"]

    Client --> L1
    L1 --> L2
    L2 --> L3
    L3 --> Backend

    style L1 fill:#ffebee,stroke:#f44336,stroke-width:2px
    style L2 fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style L3 fill:#e8f5e9,stroke:#4caf50,stroke-width:2px
    style Backend fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
  

プラン別の制限

    RATE_LIMITS = {
    'free': {'limit': 100, 'window': 3600},       # 100 req/hour
    'basic': {'limit': 1000, 'window': 3600},     # 1000 req/hour
    'pro': {'limit': 10000, 'window': 3600},      # 10000 req/hour
    'enterprise': {'limit': 100000, 'window': 3600},  # 100000 req/hour
}

def get_rate_limit(user):
    plan = user.subscription_plan
    return RATE_LIMITS.get(plan, RATE_LIMITS['free'])
  

エンドポイント別の制限

    ENDPOINT_LIMITS = {
    'POST /api/auth/login': {'limit': 5, 'window': 60},       # ブルートフォース対策
    'POST /api/auth/register': {'limit': 3, 'window': 3600},  # スパム対策
    'GET /api/search': {'limit': 30, 'window': 60},           # 重いクエリ
    'default': {'limit': 100, 'window': 60},
}

def get_endpoint_limit(method, path):
    key = f'{method} {path}'
    return ENDPOINT_LIMITS.get(key, ENDPOINT_LIMITS['default'])
  

【実務】監視とアラート

監視すべきメトリクス

メトリクス 説明 アラート閾値(例)
429 レスポンス率 Rate Limited されたリクエストの割合 > 5%
制限到達ユーザー数 制限に達したユーザーの数 急増時
平均リクエストレート ユーザーあたりの平均レート 異常値検出

Prometheus メトリクス

    from prometheus_client import Counter, Histogram

rate_limit_hits = Counter(
    'rate_limit_hits_total',
    'Total number of rate limited requests',
    ['endpoint', 'user_tier']
)

rate_limit_remaining = Histogram(
    'rate_limit_remaining',
    'Remaining rate limit when request was made',
    ['endpoint']
)

@app.after_request
def record_metrics(response):
    if response.status_code == 429:
        rate_limit_hits.labels(
            endpoint=request.endpoint,
            user_tier=g.user.tier if hasattr(g, 'user') else 'anonymous'
        ).inc()

    remaining = response.headers.get('X-RateLimit-Remaining')
    if remaining:
        rate_limit_remaining.labels(endpoint=request.endpoint).observe(int(remaining))

    return response
  

アラート例(Alertmanager)

    groups:
  - name: rate_limiting
    rules:
      - alert: HighRateLimitRate
        expr: |
          sum(rate(rate_limit_hits_total[5m]))
          / sum(rate(http_requests_total[5m])) > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Rate limiting is affecting more than 5% of requests"

      - alert: RateLimitAbuse
        expr: |
          rate(rate_limit_hits_total{user_tier="free"}[1h]) > 1000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Potential rate limit abuse detected"
  

実務チェックリスト

設計時

  • どのアルゴリズムを使うか決めたか
  • 制限の粒度は適切か(IP/ユーザー/APIキー)
  • エンドポイントごとの制限は必要か
  • プラン別の制限は必要か

実装時

  • 分散環境での整合性は確保されているか
  • レスポンスヘッダーは適切か
  • 429 レスポンスの内容は親切か
  • Retry-After ヘッダーは返しているか

運用時

  • 429 レスポンス率は監視されているか
  • アラートは設定されているか
  • 制限値の調整は容易か
  • 特定ユーザーの制限緩和は可能か

まとめ

Rate Limitingの本質は、リソースの公平な分配とシステムの保護だ。

アルゴリズムの選択

要件 アルゴリズム
シンプルさ重視 固定ウィンドウ
正確性重視 スライディングログ
バランス型 スライディングカウンター
バースト許容 トークンバケット
安定出力 リーキーバケット

実装の選択

要件 実装
シンプル、単一サーバー メモリ内
分散環境 Redis
インフラ層 Nginx / API Gateway
アプリ層 ミドルウェア

運用のポイント

  • 適切なエラーレスポンス(429 + Retry-After)
  • 多層防御(CDN → LB → App)
  • 監視とアラート

Rate Limitingは保険。平時は目立たないが、いざという時にシステムを救う。