Featured image of post Task Queue & Message Broker: Celery, RabbitMQ và Kafka — Phân biệt rõ ràng

Task Queue & Message Broker: Celery, RabbitMQ và Kafka — Phân biệt rõ ràng

Tại sao gửi email làm đứng API? Hướng dẫn chuyên sâu về xử lý bất đồng bộ: từ Celery/Django-Q (hàng đợi tác vụ) đến RabbitMQ (môi giới) và Kafka (luồng sự kiện).

User bấm “Gửi Email”. API của bạn đứng hình 3 giây trong khi nó đang trao đổi với SMTP server. Rồi mới trả về “OK.”

Cái 3 giây chờ đó là tội ác.

Mọi thao tác không cần xảy ra ngay bây giờ thì nên xảy ra sau đó, trong nền. Đây là toàn bộ triết lý đằng sau task queue và message broker.

Đây là Hướng dẫn chuyên sâu về xử lý bất đồng bộ — từ cái task queue Django đơn giản nhất đến Kafka quy mô hành tinh.


Phần 1: Nền tảng (Mô hình tư duy)

Ý tưởng cốt lõi rất đơn giản: Tách “Yêu cầu” khỏi “Công việc”.

1
2
3
4
5
6
7
KHÔNG CÓ QUEUE:
Request → API → "Gửi Email"... chờ 3 giây... → Response

CÓ QUEUE:
Request → API → "Đẩy task vào Queue" → Response (TỨC THỜI!)
                        ↓ (Chạy ngầm, vài giây sau)
                   Celery Worker → "Gửi Email"

Phân cấp “Bưu điện”

Hãy tưởng tượng như ba tầng của hệ thống thư tín:

  • Task Queue (Celery / Django-Q) = Hòm thư trước cửa nhà Đơn giản, nội địa. Bạn nhét thư vào, người đưa thư tự đến lấy. Không cần quan tâm nó đi đâu.

  • Message Broker (RabbitMQ) = Chi nhánh Bưu điện Định tuyến thông minh. Bạn mang một gói hàng “KHẨN” và một gói “Thường”. Nhân viên biết gửi Khẩn theo xe chuyển phát nhanh và Thường theo xe hàng tuần. Nhiều người gửi, nhiều người nhận.

  • Event Streaming (Kafka) = Tờ báo Quốc gia Mọi người đăng bài (events). Mọi người đăng ký đọc theo tốc độ riêng. Bài cũ được lưu trữ mãi. Bạn có thể đọc lại (replay) tờ báo thứ Ba tuần trước bất cứ lúc nào.


Phần 2: Bộ công cụ (Ai làm gì?)

Hiểu các tầng là cực kỳ quan trọng. Chúng giải quyết những vấn đề khác nhau.

Tầng 1: Task Queue (Celery / Django-Q)

Đây là điểm khởi đầu. Bạn có một app Django, cần chạy thứ gì đó ở nền.

Là gì: Một thư viện Python chạy hàm bất đồng bộ trong một process riêng (worker).

Cần gì: Một Broker để lưu trữ các task. (Mặc định dùng Redis hoặc RabbitMQ).

1
[Django App] --đẩy task--> [Broker: Redis/RabbitMQ] <--lấy task-- [Celery Worker]

Dùng Celery khi: Gửi email, resize ảnh, tạo PDF, chạy import hàng đêm.

Tầng 2: Message Broker (RabbitMQ)

Đây là phần “trung gian”. Nó là một server chuyên dụng để nhận, lưu trữ và định tuyến messages.

Khái niệm chính:

  • Producer: Gửi messages vào broker.
  • Queue: Một bộ đệm (buffer) có tên, lưu trữ messages.
  • Consumer: Lấy messages từ queue để xử lý.
  • Exchange: Bộ “Router”. Dựa trên routing key, nó quyết định message đi vào queue nào.

Dùng RabbitMQ khi: Có nhiều service cần nói chuyện với nhau bất đồng bộ. “Khi có Đơn hàng mới, thông báo cho Service Kho hàng VÀ Service Thông báo.”

Tầng 3: Event Streaming Platform (Kafka)

Kafka là một con thú hoàn toàn khác. Nó không phải là queue — nó là một log phân tán, chỉ ghi thêm (append-only).

Khái niệm chính:

  • Topic: Như một bảng trong database, hoặc một kênh tin tức.
  • Partition: Một topic được chia thành nhiều partition để tăng tốc xử lý song song.
  • Offset: Số thứ tự tuần tự của mỗi message. Consumer X đang ở offset 1500, Consumer Y ở offset 2300. Messages không bao giờ bị xóa (cho đến khi hết thời gian lưu trữ).
  • Consumer Group: Nhiều consumer cùng xử lý song song một topic. Kafka chia partition cho từng người.

Dùng Kafka khi: Bạn cần nhật ký kiểm toán (audit log) vĩnh viễn, có thể phát lại. “Mọi sự kiện thanh toán, mãi mãi. Bất kỳ service nào cũng có thể đăng ký đọc ngay bây giờ hoặc bắt kịp từ 3 tháng trước.”


Phần 3: Điều tra (Debug như chuyên gia)

1. Theo dõi Celery Workers

1
2
3
4
5
# Xem tất cả task đang chạy và worker nào đang active
celery -A myapp inspect active

# Xem độ dài queue hiện tại (qua Redis)
redis-cli llen celery

2. Dead Letter Queue (DLQ) — Hàng đợi xác chết

Pattern quan trọng nhất bạn phải implement. Nếu một task thất bại vĩnh viễn (thử lại 3 lần, vẫn lỗi), nó đi đâu?

  • Không có DLQ: Message bị nuốt vào hư không. 🔥 Mất dữ liệu.
  • Có DLQ: Message thất bại được chuyển vào một queue đặc biệt “Xác chết”. Bạn có thể soi nó, fix bug, rồi đẩy lại.

Trong RabbitMQ: Cấu hình qua x-dead-letter-exchange. Trong Celery: Dùng hook on_failure hoặc xử lý DLQ ở cấp task.

3. Consumer Lag trong Kafka

Chỉ số quan trọng nhất trong Kafka. Consumer Lag = Consumer đang trễ bao nhiêu message so với message mới nhất.

1
2
3
4
5
6
7
# Kiểm tra consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-consumer-group

# Output:
# TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# payments    0          1500            1850            350  <-- Consumer đang trễ 350 messages!

Nếu lag tăng, consumer của bạn không xử lý kịp. Cần thêm instance consumer (tối đa bằng số partition).


Phần 4: Chẩn đoán (Lỗi thường gặp)

Triệu chứngNguyên nhânCách sửa
Queue cứ phình to mãiWorker quá chậmThêm Celery worker. Tối ưu task.
Cùng một task chạy hai lầnKhông có idempotency. Worker crash giữa chừng, broker thử lại.Làm task idempotent: “Nếu đơn hàng #123 đã xử lý rồi thì bỏ qua.”
Django-Q vs CeleryDjango-Q không cần broker riêng (dùng DB). Celery mạnh hơn nhưng cần Redis/RabbitMQ.Dùng Django-Q cho dự án nhỏ. Dùng Celery cho production.
Kafka mất messageacks=1 nghĩa là chỉ leader xác nhận. Leader crash trước khi replica kịp sao chép.Set acks=allProducer.
Kafka không scale đượcTopic chỉ có 1 partition. Không thể có nhiều consumer hơn số partition.Tăng số partition khi tạo Topic. (Không giảm được sau khi tạo!)

Phần 5: Giải pháp (Sách nấu ăn Python)

1. Celery với Django (Bộ chuẩn)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"

# tasks.py
from celery import shared_task
import logging

logger = logging.getLogger(__name__)

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60  # Chờ 60 giây trước khi thử lại
)
def send_welcome_email(self, user_id: int):
    try:
        user = User.objects.get(id=user_id)
        # ... gửi email ...
        logger.info(f"Gửi email tới {user.email} thành công")
    except Exception as exc:
        # Exponential backoff (Chờ lâu hơn sau mỗi lần thất bại)
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

# views.py (Gọi task)
def register(request):
    user = create_user(request)
    send_welcome_email.delay(user.id)  # .delay() → Bất đồng bộ.
    return JsonResponse({"status": "ok"})  # Trả về NGAY LẬP TỨC!

2. Django-Q (Không cần hạ tầng)

Khi bạn không muốn setup Redis/RabbitMQ cho dự án nhỏ.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# settings.py (Dùng DB của bạn làm broker)
Q_CLUSTER = {
    "name": "DjangoQ",
    "workers": 4,
    "orm": "default"  # Dùng Postgres/MySQL của dự án
}

# Cách dùng
from django_q.tasks import async_task

async_task("myapp.tasks.send_welcome_email", user_id=user.id)

3. Kafka với Python (Luồng sự kiện)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from kafka import KafkaProducer, KafkaConsumer
import json

# --- PRODUCER (ví dụ: trong Payment Service) ---
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    acks="all",  # Chờ tất cả replica xác nhận
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

producer.send("payments", {
    "order_id": 1234,
    "amount": 99.99,
    "status": "paid"
})

# --- CONSUMER (ví dụ: trong Notification Service) ---
consumer = KafkaConsumer(
    "payments",
    bootstrap_servers="localhost:9092",
    group_id="notification-service",  # Consumer Group
    auto_offset_reset="earliest"  # Bắt đầu từ đầu nếu là subscriber mới
)

for message in consumer:
    payload = json.loads(message.value)
    send_payment_receipt(payload["order_id"])

Mô hình tư duy chốt hạ

1
2
3
4
5
6
7
8
Django-Q   → Hòm thư trước cửa nhà. (Đơn giản, không cần hạ tầng, dùng DB).
Celery     → Nhân viên giao hàng chuyên nghiệp. (Nhanh, mạnh, cần Redis/RabbitMQ).
RabbitMQ   → Bưu điện trung gian. (Định tuyến message giữa các service).
Kafka      → Kho lưu trữ Quốc gia + Báo phát sóng trực tiếp. (Log vĩnh viễn, replay được).

Task Queue → "Làm cái này sau."
Broker     → "Chuyển cái này đến đúng người."
Kafka      → "Ghi lại mọi thứ đã xảy ra, mãi mãi."

Hướng dẫn chọn lựa:

  1. App Django nhỏ, chạy ngầm?Django-Q (không cần hạ tầng thêm).
  2. App Django production?Celery + Redis.
  3. Nhiều microservice cần nói chuyện?RabbitMQ.
  4. Event sourcing, audit log, pipeline dữ liệu liên team?Kafka.
Được tạo với sự lười biếng tình yêu 🦥

Subscribe to My Newsletter