この記事の対象読者

  • 「メッセージキュー」を聞いたことはあるけど、使ったことがない人
  • 同期処理の限界を感じている人
  • レガシーなキュー(Q4M等)を運用していて、モダン化を考えている人
  • Kafka、RabbitMQ、SQSの違いがよく分からない人

この記事では、メッセージキューの基本概念から、各ミドルウェアの比較実装パターン障害対応レガシーからの移行まで、体系的に解説します。


なぜメッセージキューが必要なのか

同期処理の限界

    【同期処理】
ユーザー → API → 重い処理(メール送信、画像変換、外部API)→ レスポンス
              ユーザーは待ち続ける(5秒、10秒...)
  

問題点:

  • ユーザー体験が悪い(待たされる)
  • タイムアウトのリスク
  • 1つの処理が詰まると全体に影響
  • スケールしにくい

非同期処理で解決

    【非同期処理(メッセージキュー)】
ユーザー → API → キューに投入 → 即座にレスポンス("受け付けました")
              ワーカーが後から処理(メール送信、画像変換等)
  

メリット:

  • ユーザーを待たせない
  • 処理を分散できる
  • リトライ機構を組み込みやすい(※実装は別途必要)
  • スケールしやすい

メッセージキューの基本概念

アーキテクチャ

    graph LR
    A["Producer<br/>(送信側)"]
    B["Message Queue<br/>(キュー)"]
    C["Consumer<br/>(受信側)"]

    A -->|"メッセージを<br/>キューに投入"| B
    B -->|"メッセージを<br/>一時的に保存"| C
    C -.->|"メッセージを<br/>取り出して処理"| C

    style A fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style B fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style C fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
  

用語解説

用語 説明
Producer メッセージを送信する側
Consumer メッセージを受信・処理する側
Message キューに入るデータ(JSON等)
Queue メッセージを保持するバッファ
Broker キューを管理するサーバー
Ack 処理完了の確認応答

2つのメッセージングパターン

1. Point-to-Point(1対1)

    Producer → Queue → Consumer

- 1つのメッセージは1つのConsumerだけが処理
- 処理が完了したらメッセージは削除
- 用途:タスク処理、ジョブキュー
  

2. Pub/Sub(1対多)

    Publisher → Topic → Subscriber A
                  → Subscriber B
                  → Subscriber C

- 1つのメッセージを複数のSubscriberが受信
- 用途:イベント通知、ログ配信、リアルタイム更新
  

メッセージキューの比較

主要なメッセージキュー一覧

名前 特徴 用途
Redis (List/Stream) シンプル、高速 軽量なジョブキュー
RabbitMQ 柔軟なルーティング 複雑なメッセージング
Apache Kafka 高スループット、永続化 ログ収集、イベント駆動
Amazon SQS フルマネージド AWSインフラ
Q4M MySQL上で動作 レガシーシステム

詳細比較表

項目 Redis RabbitMQ Kafka SQS Q4M
スループット 非常に高 低〜中
永続化 オプション あり あり あり あり(MySQL)
順序保証 あり あり パーティション内 なし(FIFO除く) あり
Pub/Sub あり あり あり SNS連携 なし
運用難易度 低(マネージド)
遅延配信 手動実装 あり なし あり 手動実装
現在の推奨 ❌(レガシー)

第1部:Q4M(レガシーだが現役)

Q4Mとは

Q4M(Queue for MySQL) は、MySQLのストレージエンジンとして実装されたメッセージキューです。

    特徴:
- MySQLのテーブルがそのままキューになる
- SQLでキュー操作ができる
- 2008年頃に開発、現在は開発停止
- レガシーシステムではまだ現役
  

なぜQ4Mが使われたのか

    2008年頃の状況:
- RabbitMQはまだ発展途上
- Kafkaは存在しない(2011年〜)
- Redisもキューとしては使われていなかった
- MySQLは既に多くの会社で運用されていた

→ 「MySQLに追加するだけでキューが使える」Q4Mは魅力的だった
  

Q4Mの仕組み

テーブル作成

    -- Q4Mエンジンでテーブルを作成
CREATE TABLE job_queue (
    id INT AUTO_INCREMENT PRIMARY KEY,
    job_type VARCHAR(50) NOT NULL,
    payload TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=QUEUE;
  

メッセージの投入(Producer)

    -- 通常のINSERTでキューに投入
INSERT INTO job_queue (job_type, payload)
VALUES ('send_email', '{"to":"user@example.com","subject":"Welcome"}');
  

メッセージの取得(Consumer)

    -- queue_wait()でメッセージを待機・取得
SELECT * FROM job_queue WHERE queue_wait('job_queue', 10);
-- 10秒間待機、メッセージがあれば返す

-- 処理が完了したらqueue_end()で確定
SELECT queue_end();

-- 処理に失敗したらqueue_abort()でキューに戻す
SELECT queue_abort();
  

Q4Mのワーカー実装(PHP)

    <?php
// worker.php - Q4Mワーカーの実装例

class Q4MWorker
{
    private PDO $pdo;
    private bool $running = true;

    public function __construct(PDO $pdo)
    {
        $this->pdo = $pdo;
        // シグナルハンドラ設定
        pcntl_signal(SIGTERM, [$this, 'handleSignal']);
        pcntl_signal(SIGINT, [$this, 'handleSignal']);
    }

    public function handleSignal(int $signal): void
    {
        echo "Received signal {$signal}, shutting down...\n";
        $this->running = false;
    }

    public function run(): void
    {
        echo "Worker started\n";

        while ($this->running) {
            pcntl_signal_dispatch();

            try {
                $this->processJob();
            } catch (Exception $e) {
                echo "Error: " . $e->getMessage() . "\n";
                sleep(1);
            }
        }

        echo "Worker stopped\n";
    }

    private function processJob(): void
    {
        // queue_wait()でジョブを取得(10秒タイムアウト)
        $stmt = $this->pdo->query(
            "SELECT * FROM job_queue WHERE queue_wait('job_queue', 10)"
        );
        $job = $stmt->fetch(PDO::FETCH_ASSOC);

        if (!$job) {
            // タイムアウト(ジョブなし)
            return;
        }

        echo "Processing job: {$job['id']} - {$job['job_type']}\n";

        try {
            $this->handleJob($job);

            // 処理成功 → queue_end()で削除
            $this->pdo->query("SELECT queue_end()");
            echo "Job {$job['id']} completed\n";

        } catch (Exception $e) {
            // 処理失敗 → queue_abort()でキューに戻す
            $this->pdo->query("SELECT queue_abort()");
            echo "Job {$job['id']} failed: " . $e->getMessage() . "\n";
        }
    }

    private function handleJob(array $job): void
    {
        $payload = json_decode($job['payload'], true);

        switch ($job['job_type']) {
            case 'send_email':
                $this->sendEmail($payload);
                break;
            case 'process_image':
                $this->processImage($payload);
                break;
            default:
                throw new Exception("Unknown job type: {$job['job_type']}");
        }
    }

    private function sendEmail(array $payload): void
    {
        // メール送信処理
        mail($payload['to'], $payload['subject'], $payload['body'] ?? '');
    }

    private function processImage(array $payload): void
    {
        // 画像処理
        // ...
    }
}

// 実行
$pdo = new PDO('mysql:host=localhost;dbname=myapp', 'user', 'pass');
$worker = new Q4MWorker($pdo);
$worker->run();
  

Q4Mの監視

    -- キューの状態を確認
SELECT queue_stats('job_queue');

-- 滞留しているジョブ数
SELECT COUNT(*) FROM job_queue;

-- 古いジョブの確認
SELECT * FROM job_queue
WHERE created_at < NOW() - INTERVAL 1 HOUR
ORDER BY created_at
LIMIT 10;
  

Q4Mの問題点

問題 説明
開発停止 2012年頃から更新なし
MySQL依存 MySQLのバージョンアップで動かなくなる可能性
スケール限界 MySQLの性能がボトルネック
機能不足 遅延配信、優先度、Pub/Subなし
運用ノウハウ ドキュメントが少ない

Q4Mからの移行

    Q4Mを使い続けるリスク:
- MySQLバージョンアップ時の互換性問題
- 障害時の対応が難しい(ノウハウがない)
- エンジニア採用時に説明が必要

移行先の候補:
1. Redis(シンプルな置き換え)
2. Amazon SQS(マネージドで運用楽)
3. RabbitMQ(機能豊富)
  

第2部:Redis でのキュー実装

Redisがキューに適している理由

    ✅ インメモリで高速
✅ 導入が簡単(多くの環境で既に使っている)
✅ List / Stream で柔軟な実装が可能
✅ Pub/Subもサポート
  

方式1:List を使ったシンプルなキュー

基本操作

    # キューに投入(右から追加)
RPUSH job_queue '{"type":"send_email","to":"user@example.com"}'

# キューから取得(左から取得、ブロッキング)
BLPOP job_queue 10
# 10秒待機、メッセージがあれば返す

# キューの長さ
LLEN job_queue
  

Pythonでの実装

    import redis
import json
import signal
import sys

class RedisQueueWorker:
    def __init__(self, redis_client, queue_name):
        self.redis = redis_client
        self.queue_name = queue_name
        self.running = True

        signal.signal(signal.SIGTERM, self.handle_signal)
        signal.signal(signal.SIGINT, self.handle_signal)

    def handle_signal(self, signum, frame):
        print(f"Received signal {signum}, shutting down...")
        self.running = False

    def enqueue(self, job_type: str, payload: dict):
        """ジョブをキューに投入"""
        message = json.dumps({
            "type": job_type,
            "payload": payload
        })
        self.redis.rpush(self.queue_name, message)

    def run(self):
        """ワーカーループ"""
        print(f"Worker started, listening on {self.queue_name}")

        while self.running:
            try:
                # BLPOP: ブロッキングで取得(10秒タイムアウト)
                result = self.redis.blpop(self.queue_name, timeout=10)

                if result is None:
                    continue  # タイムアウト

                _, message = result
                job = json.loads(message)

                print(f"Processing: {job['type']}")
                self.process_job(job)
                print(f"Completed: {job['type']}")

            except Exception as e:
                print(f"Error: {e}")
                # エラー時はリトライキューに入れる等の処理

        print("Worker stopped")

    def process_job(self, job: dict):
        job_type = job["type"]
        payload = job["payload"]

        if job_type == "send_email":
            self.send_email(payload)
        elif job_type == "process_image":
            self.process_image(payload)
        else:
            raise ValueError(f"Unknown job type: {job_type}")

    def send_email(self, payload):
        print(f"Sending email to {payload['to']}")
        # 実際のメール送信処理

    def process_image(self, payload):
        print(f"Processing image: {payload['path']}")
        # 実際の画像処理

# 使用例
if __name__ == "__main__":
    r = redis.Redis(host='localhost', port=6379, db=0)
    worker = RedisQueueWorker(r, "job_queue")

    # ジョブを投入
    worker.enqueue("send_email", {"to": "user@example.com", "subject": "Hello"})

    # ワーカー起動
    worker.run()
  

方式2:Reliable Queue パターン

シンプルなBLPOPには問題があります:

    問題:処理中にワーカーが死ぬとメッセージが消える

BLPOP → メッセージ取得 → ワーカーがクラッシュ → メッセージ消失
  

解決策:BRPOPLPUSH(または BLMOVE)

    class ReliableQueue:
    def __init__(self, redis_client, queue_name):
        self.redis = redis_client
        self.queue_name = queue_name
        self.processing_queue = f"{queue_name}:processing"

    def enqueue(self, message: str):
        self.redis.rpush(self.queue_name, message)

    def dequeue(self, timeout=10):
        """取得と同時に処理中キューに移動"""
        # BRPOPLPUSH: 取得しつつprocessing_queueに移動
        message = self.redis.brpoplpush(
            self.queue_name,
            self.processing_queue,
            timeout=timeout
        )
        return message

    def complete(self, message: str):
        """処理完了 → processing_queueから削除"""
        self.redis.lrem(self.processing_queue, 1, message)

    def fail(self, message: str):
        """処理失敗 → 元のキューに戻す"""
        self.redis.lrem(self.processing_queue, 1, message)
        self.redis.rpush(self.queue_name, message)

    def recover_stale_jobs(self, max_age_seconds=300):
        """処理中のまま放置されたジョブを回復"""
        # 実装では、各メッセージにタイムスタンプを含め、
        # 古いものをqueue_nameに戻す
        pass
  

方式3:Redis Streams(推奨)

Redis 5.0以降で使えるStreamsは、Kafkaライクな機能を持ちます。

    # メッセージを追加
XADD job_stream * type send_email to user@example.com

# Consumer Groupを作成
XGROUP CREATE job_stream workers $ MKSTREAM

# メッセージを読み取り(Consumer Group経由)
XREADGROUP GROUP workers worker1 COUNT 1 BLOCK 10000 STREAMS job_stream >

# 処理完了を通知(ACK)
XACK job_stream workers 1234567890123-0
  

Pythonでの実装

    import redis
import json

class RedisStreamWorker:
    def __init__(self, redis_client, stream_name, group_name, consumer_name):
        self.redis = redis_client
        self.stream = stream_name
        self.group = group_name
        self.consumer = consumer_name
        self.running = True

        # Consumer Groupを作成(既に存在すればスキップ)
        try:
            self.redis.xgroup_create(stream_name, group_name, id='0', mkstream=True)
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    def enqueue(self, job_type: str, payload: dict):
        """Streamにメッセージを追加"""
        self.redis.xadd(self.stream, {
            "type": job_type,
            "payload": json.dumps(payload)
        })

    def run(self):
        print(f"Worker {self.consumer} started")

        while self.running:
            try:
                # XREADGROUPでメッセージを取得
                messages = self.redis.xreadgroup(
                    self.group,
                    self.consumer,
                    {self.stream: '>'},  # 新しいメッセージのみ
                    count=1,
                    block=10000  # 10秒ブロック
                )

                if not messages:
                    continue

                for stream_name, stream_messages in messages:
                    for message_id, data in stream_messages:
                        self.process_message(message_id, data)

            except Exception as e:
                print(f"Error: {e}")

    def process_message(self, message_id, data):
        try:
            job_type = data[b'type'].decode()
            payload = json.loads(data[b'payload'].decode())

            print(f"Processing {message_id}: {job_type}")

            # ジョブを処理
            self.handle_job(job_type, payload)

            # 処理完了 → ACK
            self.redis.xack(self.stream, self.group, message_id)
            print(f"Completed {message_id}")

        except Exception as e:
            print(f"Failed {message_id}: {e}")
            # 失敗したメッセージはACKしない → 再処理される

    def handle_job(self, job_type, payload):
        if job_type == "send_email":
            print(f"Sending email to {payload['to']}")
        # ...

    def recover_pending(self):
        """処理中のまま残っているメッセージを再処理"""
        pending = self.redis.xpending_range(
            self.stream, self.group, '-', '+', count=100
        )
        for item in pending:
            message_id = item['message_id']
            idle_time = item['time_since_delivered']

            if idle_time > 300000:  # 5分以上経過
                # 自分のConsumerに割り当て直す
                self.redis.xclaim(
                    self.stream, self.group, self.consumer,
                    min_idle_time=300000,
                    message_ids=[message_id]
                )

# 使用例
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=False)
worker = RedisStreamWorker(r, "job_stream", "workers", "worker1")

# ジョブ投入
worker.enqueue("send_email", {"to": "user@example.com"})

# ワーカー起動
worker.run()
  

第3部:RabbitMQ

RabbitMQの特徴

    ✅ 柔軟なルーティング(Exchange)
✅ 複数のメッセージングパターン
✅ 永続化、クラスタリング対応
✅ 管理UI付き
  

RabbitMQのアーキテクチャ

    Producer → Exchange → Binding → Queue → Consumer
      ルーティングルール
  

Exchange の種類

種類 ルーティング
Direct ルーティングキーが完全一致
Fanout 全てのキューにブロードキャスト
Topic パターンマッチング(*.error, log.#)
Headers ヘッダー属性でマッチング

Pythonでの実装

    import pika
import json

class RabbitMQProducer:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()

        # キューを宣言(存在しなければ作成)
        self.channel.queue_declare(queue='job_queue', durable=True)

    def publish(self, job_type: str, payload: dict):
        message = json.dumps({
            "type": job_type,
            "payload": payload
        })

        self.channel.basic_publish(
            exchange='',
            routing_key='job_queue',
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # 永続化
            )
        )
        print(f"Sent: {job_type}")

    def close(self):
        self.connection.close()


class RabbitMQConsumer:
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='job_queue', durable=True)

        # 1つずつ処理(prefetch=1)
        self.channel.basic_qos(prefetch_count=1)

    def callback(self, ch, method, properties, body):
        job = json.loads(body)
        print(f"Processing: {job['type']}")

        try:
            self.process_job(job)
            # 処理成功 → ACK
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print(f"Completed: {job['type']}")

        except Exception as e:
            print(f"Failed: {e}")
            # 処理失敗 → NACK(リキュー)
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

    def process_job(self, job):
        job_type = job["type"]
        payload = job["payload"]

        if job_type == "send_email":
            print(f"Sending email to {payload['to']}")
        # ...

    def run(self):
        self.channel.basic_consume(
            queue='job_queue',
            on_message_callback=self.callback
        )
        print("Waiting for messages...")
        self.channel.start_consuming()

# Producer
producer = RabbitMQProducer()
producer.publish("send_email", {"to": "user@example.com"})
producer.close()

# Consumer
consumer = RabbitMQConsumer()
consumer.run()
  

遅延キュー(Dead Letter Exchange)

    # 遅延キューの設定
channel.exchange_declare(exchange='delayed_exchange', exchange_type='direct')

# 本来のキュー
channel.queue_declare(
    queue='job_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'delayed_exchange',
        'x-dead-letter-routing-key': 'delayed'
    }
)

# 遅延用キュー(TTL付き)
channel.queue_declare(
    queue='delayed_queue',
    durable=True,
    arguments={
        'x-message-ttl': 60000,  # 60秒後に配信
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'job_queue'
    }
)
  

第4部:Apache Kafka

Kafkaの特徴

    ✅ 超高スループット(100万メッセージ/秒)
✅ 永続化・リプレイ可能
✅ 分散・スケーラブル
✅ イベントソーシングに最適
  

Kafkaのアーキテクチャ

    graph TB
    subgraph cluster["Kafka Cluster"]
        subgraph topic["Topic"]
            p0["Partition 0<br/>[0][1][2]"]
            p1["Partition 1<br/>[0][1]"]
            p2["Partition 2<br/>[0][1][2]"]
        end
    end

    producer["Producer<br/>(書き込み)"]
    consumer["Consumer Group<br/>(読み取り)"]

    producer -->|送信| topic
    topic -->|取得| consumer

    style cluster fill:#f5f5f5,stroke:#424242,stroke-width:2px
    style topic fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style p0 fill:#e3f2fd,stroke:#1976d2,stroke-width:1px
    style p1 fill:#e3f2fd,stroke:#1976d2,stroke-width:1px
    style p2 fill:#e3f2fd,stroke:#1976d2,stroke-width:1px
    style producer fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
    style consumer fill:#ffccbc,stroke:#d84315,stroke-width:2px
  

Kafkaの概念

概念 説明
Topic メッセージのカテゴリ
Partition Topicの分割単位(並列処理用)
Offset Partition内のメッセージ位置
Consumer Group Consumerの論理グループ
Broker Kafkaサーバー

Pythonでの実装

    from kafka import KafkaProducer, KafkaConsumer
import json

# Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# メッセージ送信
producer.send('job_topic', {
    "type": "send_email",
    "payload": {"to": "user@example.com"}
})
producer.flush()

# Consumer
consumer = KafkaConsumer(
    'job_topic',
    bootstrap_servers=['localhost:9092'],
    group_id='job_workers',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    job = message.value
    print(f"Processing: {job['type']}")
    # 処理
    # Consumer Groupにより自動的にオフセットがコミットされる
  

Kafkaの使いどころ

    ✅ ログ収集・分析パイプライン
✅ イベント駆動アーキテクチャ
✅ マイクロサービス間通信
✅ リアルタイムデータストリーミング
✅ 監査ログ(永続化が必要)

❌ 単純なジョブキュー(オーバースペック)
❌ 小規模システム(運用コストが高い)
  

第5部:Amazon SQS

SQSの特徴

    ✅ フルマネージド(運用不要)
✅ 高い可用性(マルチAZ)
✅ 自動スケーリング
✅ 従量課金
  

2つのキュータイプ

タイプ 特徴 用途
Standard 高スループット、順序保証なし 一般的なジョブ
FIFO 順序保証あり、重複排除 順序が重要な処理

Pythonでの実装(boto3)

    import boto3
import json

sqs = boto3.client('sqs', region_name='ap-northeast-1')
queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789/job-queue'

# Producer
def send_message(job_type: str, payload: dict):
    sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps({
            "type": job_type,
            "payload": payload
        }),
        MessageAttributes={
            'JobType': {
                'StringValue': job_type,
                'DataType': 'String'
            }
        }
    )

# Consumer
def process_messages():
    while True:
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,  # ロングポーリング
            VisibilityTimeout=60  # 処理時間
        )

        messages = response.get('Messages', [])

        for message in messages:
            try:
                job = json.loads(message['Body'])
                print(f"Processing: {job['type']}")

                # ジョブ処理
                handle_job(job)

                # 処理成功 → 削除
                sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=message['ReceiptHandle']
                )

            except Exception as e:
                print(f"Error: {e}")
                # VisibilityTimeout後に再処理される

def handle_job(job):
    if job["type"] == "send_email":
        print(f"Sending email to {job['payload']['to']}")

# 使用例
send_message("send_email", {"to": "user@example.com"})
process_messages()
  

Dead Letter Queue(DLQ)

    # 失敗したメッセージを別のキューに移動
# AWSコンソールまたはCloudFormationで設定

# DLQからメッセージを取得して分析
dlq_url = 'https://sqs.../job-queue-dlq'

response = sqs.receive_message(
    QueueUrl=dlq_url,
    MaxNumberOfMessages=10
)

for message in response.get('Messages', []):
    print(f"Failed message: {message['Body']}")
    # 原因を調査し、修正後に再投入
  

第6部:実装パターン集

パターン1:リトライ with 指数バックオフ

    import time
import random

def process_with_retry(job, max_retries=3):
    for attempt in range(max_retries):
        try:
            process_job(job)
            return True
        except Exception as e:
            if attempt == max_retries - 1:
                # 最後の試行 → DLQに入れる
                send_to_dlq(job, str(e))
                return False

            # 指数バックオフ + ジッター
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"Retry {attempt + 1} after {wait_time:.2f}s")
            time.sleep(wait_time)
  

パターン2:優先度付きキュー

    # Redisで優先度別のキューを作成
QUEUES = [
    "jobs:critical",  # 最優先
    "jobs:high",
    "jobs:normal",
    "jobs:low"
]

def dequeue_by_priority(redis_client, timeout=10):
    """優先度順にデキュー"""
    result = redis_client.blpop(QUEUES, timeout=timeout)
    if result:
        queue_name, message = result
        priority = queue_name.split(':')[1]
        return priority, message
    return None, None

def enqueue_with_priority(redis_client, priority, message):
    """優先度を指定してエンキュー"""
    queue_name = f"jobs:{priority}"
    redis_client.rpush(queue_name, message)
  

パターン3:バッチ処理

    def process_batch(redis_client, batch_size=100, max_wait=5):
    """複数メッセージをまとめて処理"""
    messages = []
    start_time = time.time()

    while len(messages) < batch_size:
        elapsed = time.time() - start_time
        remaining = max(0, max_wait - elapsed)

        if remaining == 0:
            break

        result = redis_client.blpop("jobs", timeout=int(remaining))
        if result:
            messages.append(result[1])

    if messages:
        # バッチ処理
        process_messages_bulk(messages)
        print(f"Processed {len(messages)} messages")
  

パターン4:遅延キュー

    import time

def enqueue_delayed(redis_client, message, delay_seconds):
    """指定秒後に処理されるジョブを投入"""
    execute_at = time.time() + delay_seconds
    redis_client.zadd("delayed_jobs", {message: execute_at})

def process_delayed_jobs(redis_client):
    """遅延ジョブを処理"""
    while True:
        now = time.time()

        # 実行時刻を過ぎたジョブを取得
        jobs = redis_client.zrangebyscore(
            "delayed_jobs", 0, now, start=0, num=10
        )

        for job in jobs:
            # 遅延キューから削除
            if redis_client.zrem("delayed_jobs", job):
                # 通常キューに投入
                redis_client.rpush("jobs", job)

        time.sleep(1)
  

パターン5:冪等性の確保

    def process_idempotent(redis_client, job):
    """同じジョブを2回処理しない"""
    job_id = job["id"]
    lock_key = f"processed:{job_id}"

    # 既に処理済みかチェック
    if redis_client.exists(lock_key):
        print(f"Job {job_id} already processed, skipping")
        return

    # 処理を実行
    try:
        do_actual_work(job)

        # 処理済みフラグを立てる(24時間で期限切れ)
        redis_client.setex(lock_key, 86400, "1")

    except Exception as e:
        # 失敗時はフラグを立てない → リトライ可能
        raise
  

第7部:障害対応

よくある障害パターン

1. キューが詰まる

    # 症状:キューの長さが増え続ける

# 確認
redis-cli LLEN job_queue
# → 100000 (通常は100程度)

# 原因
# - Consumerが死んでいる
# - 処理が遅い
# - 大量のジョブが一気に投入された
  

対応:

    # 1. Consumerの状態を確認
ps aux | grep worker

# 2. Consumerを増やす
./worker.py &
./worker.py &
./worker.py &

# 3. 緊急時:キューを別の場所に退避
redis-cli RENAME job_queue job_queue_backup
# → 新しいジョブは空のキューへ
# → 後でbackupを処理
  

2. メッセージの重複処理

    症状:同じメールが2通送られた

原因:
- At-least-once配信(ACKが失われた)
- Consumerの再起動
  

対応:

    # 冪等性を確保する
def send_email_idempotent(job_id, to, subject, body):
    # 送信済みかチェック
    if is_email_sent(job_id):
        return  # スキップ

    # メール送信
    send_email(to, subject, body)

    # 送信済みフラグを保存
    mark_email_sent(job_id)
  

3. メッセージの消失

    症状:投入したはずのジョブが処理されない

原因:
- Redisの永続化設定ミス
- ACK後にConsumerがクラッシュ
- キューの誤削除
  

対応:

    # Redis永続化設定の確認
redis-cli CONFIG GET appendonly
# → "yes" であるべき

redis-cli CONFIG GET save
# → 適切なスナップショット間隔

# 永続化を有効にする
redis-cli CONFIG SET appendonly yes
  

4. Consumer が処理中にクラッシュ

    症状:ジョブが消えた or 無限ループ
  

対応:

    # Reliable Queue パターンを使用
# 処理中キューを監視し、放置されたジョブを回復

def recover_orphaned_jobs(redis_client, max_age=300):
    """処理中のまま放置されたジョブを回復"""
    processing_queue = "jobs:processing"
    main_queue = "jobs"

    # 処理中キューの全メッセージを確認
    # (実際にはタイムスタンプ付きで管理)
    orphaned = redis_client.lrange(processing_queue, 0, -1)

    for job in orphaned:
        job_data = json.loads(job)
        if is_stale(job_data, max_age):
            # 元のキューに戻す
            redis_client.lrem(processing_queue, 1, job)
            redis_client.rpush(main_queue, job)
            print(f"Recovered orphaned job: {job_data['id']}")
  

監視すべきメトリクス

    # Prometheus アラート例
groups:
  - name: message_queue
    rules:
      # キューの長さ
      - alert: QueueBacklogHigh
        expr: redis_list_length{list="job_queue"} > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Job queue backlog is high"

      # Consumer数
      - alert: NoActiveConsumers
        expr: count(up{job="queue_worker"}) == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "No active queue workers"

      # 処理レート
      - alert: ProcessingRateLow
        expr: rate(jobs_processed_total[5m]) < 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Job processing rate is low"
  

選び方ガイド

フローチャート

    flowchart TD
    start["メッセージキューが必要"]
    infra{"既存インフラは?"}
    mysql["MySQL系"]
    cloud["クラウド"]
    q4m{"Q4M使用中?"}
    aws{"AWS?"}
    migrate["移行を検討"]
    sqs["SQS"]
    req1{"要件は?"}
    req2{"要件は?"}
    simple["シンプル"]
    advanced["高機能"]
    highload["超高負荷"]
    redis["Redis"]
    rabbitmq["RabbitMQ"]
    kafka["Kafka"]

    start --> infra
    infra -->|MySQL系| mysql
    infra -->|クラウド| cloud
    mysql --> q4m
    cloud --> aws
    q4m -->|Yes| migrate
    q4m -->|No| req1
    aws -->|Yes| sqs
    aws -->|No| req2
    req1 --> simple
    req1 --> advanced
    req1 --> highload
    req2 --> simple
    simple --> redis
    advanced --> rabbitmq
    highload --> kafka

    style start fill:#e1f5fe,stroke:#01579b,stroke-width:2px
    style infra fill:#fff9c4,stroke:#f57f17,stroke-width:2px
    style q4m fill:#fff9c4,stroke:#f57f17,stroke-width:2px
    style aws fill:#fff9c4,stroke:#f57f17,stroke-width:2px
    style req1 fill:#fff9c4,stroke:#f57f17,stroke-width:2px
    style req2 fill:#fff9c4,stroke:#f57f17,stroke-width:2px
    style migrate fill:#ffccbc,stroke:#bf360c,stroke-width:2px
    style sqs fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
    style redis fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
    style rabbitmq fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
    style kafka fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
  

推奨まとめ

ケース 推奨
小〜中規模、Redisあり Redis Streams
AWS環境 Amazon SQS
複雑なルーティング RabbitMQ
超高スループット、イベント駆動 Kafka
レガシーMySQL環境 Q4M(移行を計画)

まとめ

メッセージキューの本質

    1. 同期→非同期でユーザー体験向上
2. 処理を分離してスケーラビリティ向上
3. リトライ、DLQ等の実装により障害耐性を高められる
4. ピーク負荷の吸収
  

※ 注意:キューを導入しただけで自動的に障害に強くなるわけではありません。リトライ処理、冪等性の確保、DLQの監視など、適切な実装と運用が必要です。

各キューの選び方

キュー 一言
Q4M レガシー、移行を検討
Redis シンプル、軽量
RabbitMQ 柔軟、多機能
Kafka 大規模、永続化
SQS マネージド、楽

実装の心がけ

    1. 冪等性を確保する
2. リトライ設計を入れる
3. DLQで失敗を補足する
4. 監視とアラートを設定する
5. 処理中メッセージの回復機構
  

参考リンク