Ở bài trước, bạn thấy mọi hệ thống hiện đại thường có một OLTP DB chính + nhiều “sink”: Elasticsearch cho search, Redis cho cache, pgvector cho semantic search, ClickHouse cho analytics, Snowflake cho warehouse. Câu hỏi kinh điển: làm sao đồng bộ một cách đáng tin cậy mà không mất event và không phát sinh dual-write bug?
Câu trả lời 2026: CDC (Change Data Capture) + Outbox Pattern.
1. Tại sao “dual write” là bug mặc định
// Anti-pattern kinh điển: commit DB rồi publish Kafka
await db.orders.create(order); // ✅ commit DB
await kafka.publish("orders", order); // ❌ có thể fail → Kafka không có event
4 cách nó hỏng:
- Kafka down: DB đã commit, event mất → search/cache sai vĩnh viễn.
- App crash giữa 2 dòng: commit rồi nhưng chưa publish.
- Publish trước commit: nếu commit fail, event “ma” đã gửi đi.
- Order không đảm bảo: 2 request song song, commit order đúng nhưng publish order đảo — downstream thấy update cũ hơn override update mới.
Kết luận: không có cách nào “atomic” giữa transaction DB và một broker khác — trừ 2PC (phức tạp, slow). Thay vào đó, ghi event vào chính DB như một phần của transaction, rồi dùng CDC / polling để lấy ra.
2. Transactional Outbox Pattern
Ý tưởng: trong cùng một transaction với business data, INSERT thêm một record vào bảng outbox. Vì nằm chung transaction → atomic. Một process tách riêng đọc outbox → publish Kafka → mark processed.
flowchart LR
App[App] -->|BEGIN TX| DB[(PostgreSQL)]
DB -->|INSERT orders + INSERT outbox| DB
DB -->|COMMIT| App
Worker[Outbox Worker] -->|SELECT FROM outbox| DB
Worker -->|publish| K[Kafka]
Worker -->|UPDATE processed_at| DB
Schema
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
processed_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unprocessed
ON outbox (id)
WHERE processed_at IS NULL;
Service code (TypeScript + Prisma hoặc raw SQL)
await prisma.$transaction(async (tx) => {
const order = await tx.order.create({ data: orderData });
await tx.outbox.create({
data: {
aggregateType: "order",
aggregateId: order.id.toString(),
eventType: "OrderCreated",
payload: { id: order.id, userId: order.userId, total: order.total },
},
});
});
// Không publish Kafka ở đây. Outbox worker sẽ làm.
Outbox worker
async function pollOutbox() {
while (true) {
const rows = await db.$transaction(async (tx) => {
// SKIP LOCKED: nhiều worker song song không đụng nhau
const batch = await tx.$queryRaw`
SELECT id, aggregate_id, event_type, payload
FROM outbox
WHERE processed_at IS NULL
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 500`;
for (const e of batch) {
await kafka.publish(e.event_type, {
key: e.aggregate_id, // đảm bảo order trong 1 aggregate
value: e.payload,
});
}
if (batch.length > 0) {
await tx.$executeRaw`
UPDATE outbox SET processed_at = now()
WHERE id = ANY(${batch.map((b) => b.id)})`;
}
return batch.length;
});
if (rows === 0) await sleep(200);
}
}
Vì sao chọn FOR UPDATE SKIP LOCKED?
- Chạy nhiều worker song song vẫn an toàn, không deadlock.
- Nếu 1 batch fail (Kafka down), transaction rollback → event quay lại
processed_at IS NULLcho batch sau. - Không cần phân shard thủ công.
At-least-once + idempotency
Outbox cho bạn at-least-once: event có thể duplicate (publish rồi process crash trước UPDATE). Consumer bắt buộc idempotent — có thể dùng event id làm dedup key trong downstream store (Redis/Postgres unique index).
3. CDC với Debezium — không cần thay đổi app code
Nếu bạn không control được application code (legacy, commercial software), hoặc không muốn đụng vào transaction, thì CDC đọc trực tiếp WAL/binlog là cách ít xâm lấn nhất:
flowchart LR
DB[(Primary DB<br/>PG/MySQL/Mongo)] -- WAL/binlog --> Deb[Debezium Connector]
Deb --> KC[Kafka Connect]
KC --> K[Kafka topic]
K --> ES[Elasticsearch sink]
K --> CH[ClickHouse sink]
K --> PGV[pgvector updater]
Debezium PostgreSQL connector — config
{
"name": "pg-orders",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "***",
"database.dbname": "shop",
"topic.prefix": "shop",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"slot.name": "debezium_slot",
"table.include.list": "public.orders,public.order_items",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
Yêu cầu phía PostgreSQL
-- postgresql.conf
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Tạo user và publication
CREATE USER debezium WITH REPLICATION PASSWORD '...';
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
CREATE PUBLICATION dbz_publication FOR TABLE orders, order_items;
Yêu cầu phía MySQL
# my.cnf
[mysqld]
server_id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
gtid_mode = ON
enforce_gtid_consistency = ON
CREATE USER 'debezium'@'%' IDENTIFIED BY '...';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'%';
Giám sát Debezium
- LSN lag (PG):
SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) FROM pg_replication_slots; - Snapshot progress: Debezium expose Prometheus metric
debezium_metrics_snapshotcompleted,rowscount,rowscompleted. - Replication slot bloat: slot dừng → WAL không dọn được → disk fill. Monitor slot lag và có alert nghiêm ngặt.
4. Outbox vs CDC thuần — khi nào chọn cái nào?
| Tiêu chí | Outbox | CDC Debezium |
|---|---|---|
| Xâm lấn app code | Có (phải INSERT outbox) | Không (đọc WAL) |
| Event có semantic domain | ✅ (tự bạn định nghĩa event_type, payload) | ❌ (chỉ là row delta) |
| Giữ nguyên toàn bộ old/new values | Tuỳ design | ✅ tự động |
| Hỗ trợ mọi ngôn ngữ / ORM | ✅ | ✅ |
| Infra thêm | Một bảng + worker | Kafka, Kafka Connect, Debezium |
| Performance overhead | ~10-15% trên DB (extra insert) | Gần 0 trên primary |
| Khi schema DB đổi | Không ảnh hưởng event | Phải cẩn thận schema evolution |
Quy tắc: dùng Outbox khi có quyền đụng app code và muốn event mang ngữ nghĩa domain (event-driven architecture). Dùng CDC khi muốn mirror raw data sang analytics/search/replication (ETL/ELT).
Kết hợp: nhiều team có cả 2 — Outbox cho domain events (OrderPlaced, UserRegistered), CDC cho data mirroring (sync sang warehouse).
5. Các cạm bẫy sản xuất
5.1 Outbox bloating
Bảng outbox tăng vô hạn nếu không dọn. Thiết lập retention:
-- Xoá event đã process xong > 7 ngày
DELETE FROM outbox
WHERE processed_at IS NOT NULL
AND processed_at < now() - INTERVAL '7 days';
-- Chạy qua pg_cron hoặc external scheduler
SELECT cron.schedule('outbox_cleanup', '0 3 * * *',
$$DELETE FROM outbox WHERE processed_at < now() - INTERVAL '7 days'$$);
5.2 WAL không dọn (Debezium)
Nếu consumer down vài ngày, WAL tích luỹ có thể fill disk primary. Luôn có:
- Alert
confirmed_flush_lsn lag > 1 GB max_slot_wal_keep_size(PG 13+) để giới hạn WAL per slot- Có runbook: drop slot trong emergency (chấp nhận mất event, restart snapshot)
5.3 Schema migration
- Outbox: thêm field mới trong
payloadJSONB → consumer phải handle old/new. - CDC: thay đổi cột → Debezium gửi
schema-changeevent; consumer parse schema registry (Avro/Protobuf) mới.
5.4 Event ordering
- Mọi event cho cùng aggregate phải đi vào cùng partition Kafka → dùng
aggregate_idlàm partition key. - Với CDC PG logical replication, ordering được đảm bảo trong cùng một publication (transactional order).
5.5 Kafka alternatives
Không nhất thiết phải Kafka. Alternative nhẹ hơn:
- Redis Streams + consumer group — đủ cho 10-100k event/s.
- NATS JetStream — đơn giản hơn Kafka, at-least-once, durable.
- AWS SNS/SQS, GCP Pub/Sub — managed, không vận hành.
- PostgreSQL LISTEN/NOTIFY — cho volume thấp, không persist.
6. Case study ngắn: đồng bộ orders → ES + pgvector
flowchart LR
App --> PG[(PostgreSQL<br/>orders + outbox)]
Worker[Outbox Worker] --> K[Kafka<br/>topic: order-events]
K --> ES[ES Sink:<br/>index fulltext]
K --> VectorWorker
VectorWorker --> Emb[Embedding API]
Emb --> PGV[(pgvector<br/>order_embeddings)]
Pipeline bảo đảm:
- Transactional Outbox → không mất event.
- Consumer group Kafka → parallel + retry.
- Mỗi consumer idempotent (unique key = order_id + event_type).
- Fail → DLQ (dead letter queue), alert, manual replay.
7. Checklist triển khai
- Chọn Outbox (owned app) hoặc CDC (mirror DB)
- Schema outbox có index cho
WHERE processed_at IS NULL - Worker dùng
FOR UPDATE SKIP LOCKED, batch 100-500 - Consumer idempotent (dedup theo event id)
- Cron cleanup outbox cũ > 7 ngày
- Debezium: set
max_slot_wal_keep_size, alert slot lag - Schema registry (Avro/Protobuf) cho long-term contract
- Dead letter queue + runbook xử lý
- Giám sát end-to-end latency (commit DB → consumer process)
Kết luận
Mỗi khi bạn định viết await publishEvent() sau await saveToDb() — dừng lại. Đó là cái bug 30% production nhất của kiến trúc event-driven. Dùng Outbox khi bạn có app, CDC khi bạn có DB. Và nhớ rằng: at-least-once + consumer idempotent là công thức tin cậy nhất — exactly-once là ảo tưởng trong hệ phân tán.
Bài tiếp theo: Distributed SQL và sharding thực chiến — khi PostgreSQL/MySQL một instance hết khả năng, đi đến đâu.