1. Data sources
MySQL – dữ liệu vận hành
Ta dùng docker-compose.yml để dựng MySQL service
mysql:
image: debezium/example-mysql:1.6
container_name: mysql
volumes:
- ./mysql/data:/var/lib/mysql
ports:
- "3306:3306"
env_file:
- .env
cùng với file .env như bên dưới
MYSQL_ROOT_PASSWORD="debezium"
MYSQL_USER="admin"
MYSQL_PASSWORD="admin123"
Ở đây, ta dùng docker image tên là debezium/example-mysql:1.6 đã có sẵn những thiết lập về CDC để debezium có thể truy cập vào đồng bộ dữ liệu. Khi muốn cài đặt CDC cho MySQL các bạn có thể tham khảo ở link này.
Do không muốn mất công gõ nhiều dòng lệnh, tôi đã chuẩn bị một file Makefile gồm shortcut các command tương tác với hệ thống.
include .env
build:
docker-compose build
up:
docker-compose --env-file .env up -d
down:
docker-compose --env-file .env down
ps:
docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"
to_redpanda:
open http://localhost:8080/topics
to_minio:
open http://localhost:9001/buckets
to_mysql:
docker exec -it mysql mysql -u"root" -p"${MYSQL_ROOT_PASSWORD}" ${MYSQL_DATABASE}
to_data_generator:
docker exec -it data_generator /bin/bash
Ta tiến hành tạo database brazillian_ecommerce với table olist_orders_dataset để chứa dữ liệu giao dịch đặt hàng. Toàn bộ dữ liệu các bạn có thể vào Kaggle đăng ký và download về. Để thuận tiện, tôi đã chuẩn bị sẵn trên Github. Tiếp theo, ta thực hiện lệnh make up
để dựng MySQL service và make to_mysql
để truy cập vào MySQL và thực thi các dòng lệnh như bên dưới.
mysql> create database brazillian_ecommerce;
Query OK, 1 row affected (0.00 sec)
mysql> use brazillian_ecommerce;
Database changed
mysql> CREATE TABLE olist_orders_dataset (
-> order_id varchar(32),
-> customer_id varchar(32),
-> order_status varchar(16),
-> order_purchase_timestamp varchar(32),
-> order_approved_at varchar(32),
-> order_delivered_carrier_date varchar(32),
-> order_delivered_customer_date varchar(32),
-> order_estimated_delivery_date varchar(32),
-> PRIMARY KEY (order_id)
-> );
Query OK, 0 rows affected (0.01 sec)
mysql> show tables;
+--------------------------------+
| Tables_in_brazillian_ecommerce |
+--------------------------------+
| olist_orders_dataset |
+--------------------------------+
1 row in set (0.00 sec)
Tiếp đến, ta tạo src/ chứa các script generate dữ liệu transaction có cấu trúc files/folder như sau:
src
├── 01_generate_orders.py
├── 02_generate_clickstream.py
├── data
│ └── olist_orders_dataset.csv
├── Dockerfile
├── requirements.txt
└── setup_connectors.sh
- 01_generate_orders.py: dùng để generate dữ liệu giao dịch cho MySQL.
- 02_generate_clickstream.py: dùng để generate dữ liệu clickstream events.
- data/olist_orders_dataset.csv: chứa dữ liệu giao dịch.
- Dockerfile: dùng để đóng gói toàn bộ src/ code và những requirements cần thiết.
- requirements.txt: chứa danh sách các thư viện cài đặt.
- setup_connectors.sh: chứa các requests dùng để tạo kết nối source/sink cho Kafka connect.
Để build docker image cho src/ ta thêm các khai báo sau vào docker-compose.yml
data-generator:
build:
context: ./src
dockerfile: ./Dockerfile
container_name: data_generator
volumes:
- ./src:/opt/src
env_file:
- .env
Sau đó, ta dùng make để tiến hành phát sinh dữ liệu giao dịch cho MySQL
make build
make down
make up
make to_data_generator
Khi vào được data_generator container. Ta dùng python để chạy script 01_generate_orders.py. Nếu thành công, ta sẽ thấy output tương tự như bên dưới.
(74254, 9)
NO. DATES: 366
Writing data on: 2017-08-01
-Records: 165
Writing data on: 2017-08-02
-Records: 157
Writing data on: 2017-08-03
-Records: 148
Ta quay lại MySQL để kiểm tra dữ liệu đã được phát sinh

2. Ingestion layer
Redpanda – fast storage dùng stream dữ liệu real-time
Sau khi đã có dữ liệu CDC, ta tiếp tục cài đặt Redpanda để đọc streaming event này ra. Ta tiếp tục thêm các khai báo như bên dưới vào docker-compose.yml
redpanda:
image: vectorized/redpanda
container_name: redpanda
ports:
- "9092:9092"
- "29092:29092"
command:
- redpanda
- start
- --overprovisioned
- --smp
- "1"
- --memory
- "1G"
- --reserve-memory
- "0M"
- --node-id
- "0"
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
- --check=false
redpanda-console:
image: vectorized/console
container_name: redpanda_console
depends_on:
- redpanda
ports:
- "8080:8080"
env_file:
- .env
Ta restart các service bằng cách make down && make up
. Khi thành công, ta truy cập vào http://localhost:8080/topics sẽ thấy giao diện redpanda-console như bên dưới

Kafka connect – transfer dữ liệu từ source sang sink
Ta hoàn toàn có thể tự code từ đầu các quá trình như tạo topic trên Redpanda để chứa streaming events, viết script đọc dữ liệu từ topic này ra và lưu lại trên MinIO. Thay vào đó, ta đã có Kafka connect với rất nhiều các connector hỗ trợ kết nói từ source sang sink giúp ta tiết kiệm được thời gian cài đặt phức tạp vừa rồi mà chỉ cần tập trung vào việc thiết lập up and run là hoàn tất công việc.
Docker image mặc định của Kafka connect sẽ chưa có io.confluent.connect.s3.S3SinkConnector
nên ta cần tạo một folder kafka/ chưa Dockerfile để download thêm connector này cho Kafka connect
FROM debezium/connect
RUN curl -O https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.3.1/confluentinc-kafka-connect-s3-10.3.1.zip \
&& unzip confluentinc-kafka-connect-s3-10.3.1.zip \
&& mv confluentinc-kafka-connect-s3-10.3.1 /kafka/connect/ \
&& rm confluentinc-kafka-connect-s3-10.3.1.zip
Ta thêm vào docker-compose.yml với các khai báo như bên dưới
kafka-connect:
build:
context: ./kafka
dockerfile: ./Dockerfile
container_name: kafka_connect
depends_on:
- redpanda
ports:
- "8083:8083"
env_file:
- .env
Sau khi make build && make down && make up
ta kiểm tra service Kafka connect bằng cách request đến localhost:8083/connector-plugins/
# health check Kafka connect
curl -H "Accept:application/json" localhost:8083/connector-plugins/
[
{
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "10.3.1"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "io.debezium.connector.db2.Db2Connector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.mongodb.MongoDbConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.oracle.OracleConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.postgresql.PostgresConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.sqlserver.SqlServerConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "io.debezium.connector.vitess.VitessConnector",
"type": "source",
"version": "2.0.1.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.3.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.3.1"
}
]
Debezium – đọc Change Data Capture từ MySQL
Để đọc dữ liệu ra, ta chỉ cần tạo một connector cho Kafka connect. Connector này tên io.debezium.connector.mysql.MySqlConnector
. Thông tin thiết lập chi tiết, các bạn có thể tìm thấy ở đây.
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "src-brazillian-ecommerce",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.include.list": "brazillian_ecommerce",
"topic.prefix": "dbserver1",
"schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
"schema.history.internal.kafka.topic": "schema-changes.brazillian_ecommerce"
}
}'
Khi request thành công, ta quay lại Redpanda console sẽ thấy các topics liên quan đến debezium đã được tạo ra tự động.

Kiểm tra topic dbserver1.brazillian_ecommerce.olist_orders_dataset
ta thấy được toàn bộ nội dung của từng message CDC như thế nào. Dựa trên những nội dung thu thập được, ta có thể replicate ra nhiều target sink khác nhau như Database PostgreSQL, Data warehouse Redshift hay Data lakehouse.


3. Target sink
MinIO – data lake lưu trữ dữ liệu phân tán
Cuối cùng, đích đến của dữ liệu sẽ là MinIO. Ta thêm vào các khai báo cho MinIO như bên dưới vào docker-compose.yml
minio:
hostname: minio
image: "minio/minio"
container_name: minio
ports:
- "9001:9001"
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- ./minio/data:/data
env_file:
- .env
mc:
image: minio/mc
container_name: mc
hostname: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
depends_on:
- minio
cùng với các biến môi trường được thêm vào file .env
# MinIO
MINIO_ROOT_USER="minio"
MINIO_ROOT_PASSWORD="minio123"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
Sau khi make down && make up
. Ta truy cập vào http://localhost:9001/buckets sẽ thấy giao diện trình bày như hình dưới

Sink dữ liệu CDC vào MinIO
Ta dùng Kafka connect để sync trực tiếp dữ liệu về MinIO. Thông tin thiết lập chi tiết của io.confluent.connect.s3.S3SinkConnector
, các bạn có thể tìm thấy ở đây.
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-s3-brazillian-ecommerce",
"config": {
"topics.regex": "dbserver1.brazillian_ecommerce.*",
"topics.dir": "brazillian_ecommerce",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "1000",
"store.url": "http://minio:9000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "warehouse",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123"
}
}'
Kiểm tra thông tin trên Redpanda console, ta sẽ thấy một consumer group mới được tạo ra

Khi quá trình đồng bộ dữ liệu hoàn tất, ta sẽ thấy các file đã được sync về trong MinIO


4. Tương tự cho dữ liệu clickstream
Clickstream – dữ liệu user event
Ta làm tương tự bằng cách truy cập vào data_generator container thông qua make to_data_generator
. Sau đó dùng python để chạy script 02_generate_clickstream.py
khi thành công ta sẽ thấy logs như bên dưới.
root@e478fba851b1:/opt/src# python 02_generate_clickstream.py
0.129.2. Sent data to Redpanda topic clickstream_events: b'9272c9373449fa586ea14f425b5497a7' - b'{"timestamp": "2017-08-01", "event_name": "video_play", "event_value": 0}', sleeping for 1 second
0.130.0. Sent data to Redpanda topic clickstream_events: b'bef3b7e0d09c81ece65cc174475bb2f8' - b'{"timestamp": "2017-08-01", "event_name": "link_2_click", "event_value": 1}', sleeping for 1 second
0.130.1. Sent data to Redpanda topic clickstream_events: b'bef3b7e0d09c81ece65cc174475bb2f8' - b'{"timestamp": "2017-08-01", "event_name": "pdf_download", "event_value": 0}', sleeping for 1 second
.
.
.
.
2.147.0. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_1_click", "event_value": 1}', sleeping for 3 second
2.147.1. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_2_click", "event_value": 0}', sleeping for 3 second
2.147.2. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_2_click", "event_value": 1}', sleeping for 3 second
Lúc này, khi kiểm tra bằng Redpanda console, ta sẽ thấy có một topic tên clickstream_events
được tạo ra, kèm theo đó là những events được capture lại trên hệ thống.

Sau cùng, để sink dữ liệu về MinIO, tạo đăng ký một connector trên Kafka connect bằng cách request như bên dưới
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-s3-clickstream",
"config": {
"topics": "clickstream_events",
"topics.dir": "clickstream_events",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "100",
"store.url": "http://minio:9000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "warehouse",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123"
}
}'
Vâng, cuối cùng thì clickstream data cũng đã được sync về MinIO

Kết
Việc migrate dữ liệu từ hệ thống này sang hệ thống khác sẽ rất tốn thời gian và gây ra trì trệ cho hệ thống gốc nếu ta đi theo hướng batch ingestion. Vì mỗi lần ta cần thực hiện nhiều request đến database để lấy dữ liệu mới về. Nếu 1-2 table thì không đáng kể nhưng thực tế ta cần nhiều hơn table để có thể làm phân tích báo cáo hay phục vụ cho các Machine laerning model.
Stream data ingestion có nhiều ưu điểm hơn như giảm tải cho hệ thống nguồn vì chỉ đọc dữ liệu logs từ steam, nhờ vào những thay đổi được track thông qua CDC mà ta hoàn toàn có thể dựa vào source-of-truth này để tái tạo lại toàn bộ dữ liệu ở nhiều hệ thống khác.
Quá trình migrate dữ liệu mới chỉ là bước đầu trong công cuộc đem dữ liệu đến cho mọi nhà (data consumers) của Data Engineer. Công việc còn dang dở đó là làm sao có thể tái tạo lại từ CDC để người dùng có thể thấy được trọn vẹn dữ liệu ở đầu cuối? Những bài viết sau, tôi sẽ kể về 2 thuật toán chính thường được sử dụng để đọc dữ liệu CDC này lên như thế nào đó là Copy on write và Merge on read.
References
- When to choose Redpanda instead of Apache Kafka?
- Real-Time Streaming for Mortals: How Redpanda and Materialize Making It a Reality
- The Raft Consensus Algorithm
- Redpanda vs Apache Kafka: A Total Cost of Ownership comparison
- Amazon S3 Sink Connector Configuration Properties
- Federated queries in data lakes with Redpanda and Trino
- Advantages of the event-driven architecture pattern