Ở bài trước, bạn thấy mọi hệ thống hiện đại thường có một OLTP DB chính + nhiều “sink”: Elasticsearch cho search, Redis cho cache, pgvector cho semantic search, ClickHouse cho analytics, Snowflake cho warehouse. Câu hỏi kinh điển: làm sao đồng bộ một cách đáng tin cậy mà không mất event và không phát sinh dual-write bug?

Câu trả lời 2026: CDC (Change Data Capture) + Outbox Pattern.

1. Tại sao “dual write” là bug mặc định

// Anti-pattern kinh điển: commit DB rồi publish Kafka
await db.orders.create(order); // ✅ commit DB
await kafka.publish("orders", order); // ❌ có thể fail → Kafka không có event

4 cách nó hỏng:

  1. Kafka down: DB đã commit, event mất → search/cache sai vĩnh viễn.
  2. App crash giữa 2 dòng: commit rồi nhưng chưa publish.
  3. Publish trước commit: nếu commit fail, event “ma” đã gửi đi.
  4. Order không đảm bảo: 2 request song song, commit order đúng nhưng publish order đảo — downstream thấy update cũ hơn override update mới.

Kết luận: không có cách nào “atomic” giữa transaction DB và một broker khác — trừ 2PC (phức tạp, slow). Thay vào đó, ghi event vào chính DB như một phần của transaction, rồi dùng CDC / polling để lấy ra.

2. Transactional Outbox Pattern

Ý tưởng: trong cùng một transaction với business data, INSERT thêm một record vào bảng outbox. Vì nằm chung transaction → atomic. Một process tách riêng đọc outbox → publish Kafka → mark processed.


  flowchart LR
  App[App] -->|BEGIN TX| DB[(PostgreSQL)]
  DB -->|INSERT orders + INSERT outbox| DB
  DB -->|COMMIT| App
  Worker[Outbox Worker] -->|SELECT FROM outbox| DB
  Worker -->|publish| K[Kafka]
  Worker -->|UPDATE processed_at| DB

Schema

CREATE TABLE outbox (
    id             BIGSERIAL PRIMARY KEY,
    aggregate_type TEXT NOT NULL,
    aggregate_id   TEXT NOT NULL,
    event_type     TEXT NOT NULL,
    payload        JSONB NOT NULL,
    created_at     TIMESTAMPTZ DEFAULT now(),
    processed_at   TIMESTAMPTZ
);

CREATE INDEX idx_outbox_unprocessed
  ON outbox (id)
  WHERE processed_at IS NULL;

Service code (TypeScript + Prisma hoặc raw SQL)

await prisma.$transaction(async (tx) => {
  const order = await tx.order.create({ data: orderData });

  await tx.outbox.create({
    data: {
      aggregateType: "order",
      aggregateId: order.id.toString(),
      eventType: "OrderCreated",
      payload: { id: order.id, userId: order.userId, total: order.total },
    },
  });
});
// Không publish Kafka ở đây. Outbox worker sẽ làm.

Outbox worker

async function pollOutbox() {
  while (true) {
    const rows = await db.$transaction(async (tx) => {
      // SKIP LOCKED: nhiều worker song song không đụng nhau
      const batch = await tx.$queryRaw`
        SELECT id, aggregate_id, event_type, payload
        FROM outbox
        WHERE processed_at IS NULL
        ORDER BY id
        FOR UPDATE SKIP LOCKED
        LIMIT 500`;

      for (const e of batch) {
        await kafka.publish(e.event_type, {
          key: e.aggregate_id, // đảm bảo order trong 1 aggregate
          value: e.payload,
        });
      }

      if (batch.length > 0) {
        await tx.$executeRaw`
          UPDATE outbox SET processed_at = now()
          WHERE id = ANY(${batch.map((b) => b.id)})`;
      }
      return batch.length;
    });

    if (rows === 0) await sleep(200);
  }
}

Vì sao chọn FOR UPDATE SKIP LOCKED?

  • Chạy nhiều worker song song vẫn an toàn, không deadlock.
  • Nếu 1 batch fail (Kafka down), transaction rollback → event quay lại processed_at IS NULL cho batch sau.
  • Không cần phân shard thủ công.

At-least-once + idempotency

Outbox cho bạn at-least-once: event có thể duplicate (publish rồi process crash trước UPDATE). Consumer bắt buộc idempotent — có thể dùng event id làm dedup key trong downstream store (Redis/Postgres unique index).

3. CDC với Debezium — không cần thay đổi app code

Nếu bạn không control được application code (legacy, commercial software), hoặc không muốn đụng vào transaction, thì CDC đọc trực tiếp WAL/binlog là cách ít xâm lấn nhất:


  flowchart LR
  DB[(Primary DB<br/>PG/MySQL/Mongo)] -- WAL/binlog --> Deb[Debezium Connector]
  Deb --> KC[Kafka Connect]
  KC --> K[Kafka topic]
  K --> ES[Elasticsearch sink]
  K --> CH[ClickHouse sink]
  K --> PGV[pgvector updater]

Debezium PostgreSQL connector — config

{
  "name": "pg-orders",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "pg.internal",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "***",
    "database.dbname": "shop",
    "topic.prefix": "shop",
    "plugin.name": "pgoutput",
    "publication.name": "dbz_publication",
    "slot.name": "debezium_slot",
    "table.include.list": "public.orders,public.order_items",
    "snapshot.mode": "initial",
    "decimal.handling.mode": "string",
    "time.precision.mode": "adaptive",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}

Yêu cầu phía PostgreSQL

-- postgresql.conf
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4

-- Tạo user và publication
CREATE USER debezium WITH REPLICATION PASSWORD '...';
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

CREATE PUBLICATION dbz_publication FOR TABLE orders, order_items;

Yêu cầu phía MySQL

# my.cnf
[mysqld]
server_id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
gtid_mode = ON
enforce_gtid_consistency = ON
CREATE USER 'debezium'@'%' IDENTIFIED BY '...';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';

Giám sát Debezium

  • LSN lag (PG): SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) FROM pg_replication_slots;
  • Snapshot progress: Debezium expose Prometheus metric debezium_metrics_snapshotcompleted, rowscount, rowscompleted.
  • Replication slot bloat: slot dừng → WAL không dọn được → disk fill. Monitor slot lag và có alert nghiêm ngặt.

4. Outbox vs CDC thuần — khi nào chọn cái nào?

Tiêu chíOutboxCDC Debezium
Xâm lấn app codeCó (phải INSERT outbox)Không (đọc WAL)
Event có semantic domain✅ (tự bạn định nghĩa event_type, payload)❌ (chỉ là row delta)
Giữ nguyên toàn bộ old/new valuesTuỳ design✅ tự động
Hỗ trợ mọi ngôn ngữ / ORM
Infra thêmMột bảng + workerKafka, Kafka Connect, Debezium
Performance overhead~10-15% trên DB (extra insert)Gần 0 trên primary
Khi schema DB đổiKhông ảnh hưởng eventPhải cẩn thận schema evolution

Quy tắc: dùng Outbox khi có quyền đụng app code và muốn event mang ngữ nghĩa domain (event-driven architecture). Dùng CDC khi muốn mirror raw data sang analytics/search/replication (ETL/ELT).

Kết hợp: nhiều team có cả 2 — Outbox cho domain events (OrderPlaced, UserRegistered), CDC cho data mirroring (sync sang warehouse).

5. Các cạm bẫy sản xuất

5.1 Outbox bloating

Bảng outbox tăng vô hạn nếu không dọn. Thiết lập retention:

-- Xoá event đã process xong > 7 ngày
DELETE FROM outbox
WHERE processed_at IS NOT NULL
  AND processed_at < now() - INTERVAL '7 days';

-- Chạy qua pg_cron hoặc external scheduler
SELECT cron.schedule('outbox_cleanup', '0 3 * * *',
  $$DELETE FROM outbox WHERE processed_at < now() - INTERVAL '7 days'$$);

5.2 WAL không dọn (Debezium)

Nếu consumer down vài ngày, WAL tích luỹ có thể fill disk primary. Luôn có:

  • Alert confirmed_flush_lsn lag > 1 GB
  • max_slot_wal_keep_size (PG 13+) để giới hạn WAL per slot
  • Có runbook: drop slot trong emergency (chấp nhận mất event, restart snapshot)

5.3 Schema migration

  • Outbox: thêm field mới trong payload JSONB → consumer phải handle old/new.
  • CDC: thay đổi cột → Debezium gửi schema-change event; consumer parse schema registry (Avro/Protobuf) mới.

5.4 Event ordering

  • Mọi event cho cùng aggregate phải đi vào cùng partition Kafka → dùng aggregate_id làm partition key.
  • Với CDC PG logical replication, ordering được đảm bảo trong cùng một publication (transactional order).

5.5 Kafka alternatives

Không nhất thiết phải Kafka. Alternative nhẹ hơn:

  • Redis Streams + consumer group — đủ cho 10-100k event/s.
  • NATS JetStream — đơn giản hơn Kafka, at-least-once, durable.
  • AWS SNS/SQS, GCP Pub/Sub — managed, không vận hành.
  • PostgreSQL LISTEN/NOTIFY — cho volume thấp, không persist.

6. Case study ngắn: đồng bộ orders → ES + pgvector


  flowchart LR
  App --> PG[(PostgreSQL<br/>orders + outbox)]
  Worker[Outbox Worker] --> K[Kafka<br/>topic: order-events]
  K --> ES[ES Sink:<br/>index fulltext]
  K --> VectorWorker
  VectorWorker --> Emb[Embedding API]
  Emb --> PGV[(pgvector<br/>order_embeddings)]

Pipeline bảo đảm:

  • Transactional Outbox → không mất event.
  • Consumer group Kafka → parallel + retry.
  • Mỗi consumer idempotent (unique key = order_id + event_type).
  • Fail → DLQ (dead letter queue), alert, manual replay.

7. Checklist triển khai

  • Chọn Outbox (owned app) hoặc CDC (mirror DB)
  • Schema outbox có index cho WHERE processed_at IS NULL
  • Worker dùng FOR UPDATE SKIP LOCKED, batch 100-500
  • Consumer idempotent (dedup theo event id)
  • Cron cleanup outbox cũ > 7 ngày
  • Debezium: set max_slot_wal_keep_size, alert slot lag
  • Schema registry (Avro/Protobuf) cho long-term contract
  • Dead letter queue + runbook xử lý
  • Giám sát end-to-end latency (commit DB → consumer process)

Kết luận

Mỗi khi bạn định viết await publishEvent() sau await saveToDb() — dừng lại. Đó là cái bug 30% production nhất của kiến trúc event-driven. Dùng Outbox khi bạn có app, CDC khi bạn có DB. Và nhớ rằng: at-least-once + consumer idempotent là công thức tin cậy nhất — exactly-once là ảo tưởng trong hệ phân tán.

Bài tiếp theo: Distributed SQL và sharding thực chiến — khi PostgreSQL/MySQL một instance hết khả năng, đi đến đâu.