DataOps 01: Stream data ingestion với Redpanda

1. Data sources

MySQL – dữ liệu vận hành

Ta dùng docker-compose.yml để dựng MySQL service

mysql:
    image: debezium/example-mysql:1.6
    container_name: mysql
    volumes:
      - ./mysql/data:/var/lib/mysql
    ports:
      - "3306:3306"
    env_file:
      - .env

cùng với file .env như bên dưới

MYSQL_ROOT_PASSWORD="debezium"
MYSQL_USER="admin"
MYSQL_PASSWORD="admin123"

Ở đây, ta dùng docker image tên là debezium/example-mysql:1.6 đã có sẵn những thiết lập về CDC để debezium có thể truy cập vào đồng bộ dữ liệu. Khi muốn cài đặt CDC cho MySQL các bạn có thể tham khảo ở link này.

Do không muốn mất công gõ nhiều dòng lệnh, tôi đã chuẩn bị một file Makefile gồm shortcut các command tương tác với hệ thống.

include .env

build:
	docker-compose build

up:
	docker-compose --env-file .env up -d

down:
	docker-compose --env-file .env down	

ps:
	docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}"

to_redpanda:
	open http://localhost:8080/topics

to_minio:
	open http://localhost:9001/buckets

to_mysql:
	docker exec -it mysql mysql -u"root" -p"${MYSQL_ROOT_PASSWORD}" ${MYSQL_DATABASE}

to_data_generator:
	docker exec -it data_generator /bin/bash

Ta tiến hành tạo database brazillian_ecommerce với table olist_orders_dataset để chứa dữ liệu giao dịch đặt hàng. Toàn bộ dữ liệu các bạn có thể vào Kaggle đăng ký và download về. Để thuận tiện, tôi đã chuẩn bị sẵn trên Github. Tiếp theo, ta thực hiện lệnh make up để dựng MySQL service và make to_mysql để truy cập vào MySQL và thực thi các dòng lệnh như bên dưới.

mysql> create database brazillian_ecommerce;
Query OK, 1 row affected (0.00 sec)
mysql> use brazillian_ecommerce;
Database changed
mysql> CREATE TABLE olist_orders_dataset (
    ->     order_id varchar(32),
    ->     customer_id varchar(32),
    ->     order_status varchar(16),
    ->     order_purchase_timestamp varchar(32),
    ->     order_approved_at varchar(32),
    ->     order_delivered_carrier_date varchar(32),
    ->     order_delivered_customer_date varchar(32),
    ->     order_estimated_delivery_date varchar(32),
    ->     PRIMARY KEY (order_id)
    -> );
Query OK, 0 rows affected (0.01 sec)
mysql> show tables;
+--------------------------------+
| Tables_in_brazillian_ecommerce |
+--------------------------------+
| olist_orders_dataset           |
+--------------------------------+
1 row in set (0.00 sec)

Tiếp đến, ta tạo src/ chứa các script generate dữ liệu transaction có cấu trúc files/folder như sau:

src
├── 01_generate_orders.py
├── 02_generate_clickstream.py
├── data
│   └── olist_orders_dataset.csv
├── Dockerfile
├── requirements.txt
└── setup_connectors.sh
  • 01_generate_orders.py: dùng để generate dữ liệu giao dịch cho MySQL.
  • 02_generate_clickstream.py: dùng để generate dữ liệu clickstream events.
  • data/olist_orders_dataset.csv: chứa dữ liệu giao dịch.
  • Dockerfile: dùng để đóng gói toàn bộ src/ code và những requirements cần thiết.
  • requirements.txt: chứa danh sách các thư viện cài đặt.
  • setup_connectors.sh: chứa các requests dùng để tạo kết nối source/sink cho Kafka connect.

Để build docker image cho src/ ta thêm các khai báo sau vào docker-compose.yml

data-generator:
    build:
      context: ./src
      dockerfile: ./Dockerfile
    container_name: data_generator
    volumes:
      - ./src:/opt/src
    env_file:
      - .env

Sau đó, ta dùng make để tiến hành phát sinh dữ liệu giao dịch cho MySQL

make build
make down
make up
make to_data_generator

Khi vào được data_generator container. Ta dùng python để chạy script 01_generate_orders.py. Nếu thành công, ta sẽ thấy output tương tự như bên dưới.

(74254, 9)
NO. DATES: 366
Writing data on: 2017-08-01
-Records: 165
Writing data on: 2017-08-02
-Records: 157
Writing data on: 2017-08-03
-Records: 148

Ta quay lại MySQL để kiểm tra dữ liệu đã được phát sinh

select * from olist_orders_dataset limit 10;

2. Ingestion layer

Redpanda – fast storage dùng stream dữ liệu real-time

Sau khi đã có dữ liệu CDC, ta tiếp tục cài đặt Redpanda để đọc streaming event này ra. Ta tiếp tục thêm các khai báo như bên dưới vào docker-compose.yml

redpanda:
  image: vectorized/redpanda
  container_name: redpanda
  ports:
    - "9092:9092"
    - "29092:29092"
  command:
    - redpanda
    - start
    - --overprovisioned
    - --smp
    - "1"
    - --memory
    - "1G"
    - --reserve-memory
    - "0M"
    - --node-id
    - "0"
    - --kafka-addr
    - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
    - --advertise-kafka-addr
    - PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
    - --check=false

redpanda-console:
  image: vectorized/console
  container_name: redpanda_console
  depends_on:
    - redpanda
  ports:
    - "8080:8080"
  env_file:
    - .env

Ta restart các service bằng cách make down && make up. Khi thành công, ta truy cập vào http://localhost:8080/topics sẽ thấy giao diện redpanda-console như bên dưới

Redpanda UI

Kafka connect – transfer dữ liệu từ source sang sink

Ta hoàn toàn có thể tự code từ đầu các quá trình như tạo topic trên Redpanda để chứa streaming events, viết script đọc dữ liệu từ topic này ra và lưu lại trên MinIO. Thay vào đó, ta đã có Kafka connect với rất nhiều các connector hỗ trợ kết nói từ source sang sink giúp ta tiết kiệm được thời gian cài đặt phức tạp vừa rồi mà chỉ cần tập trung vào việc thiết lập up and run là hoàn tất công việc.

Docker image mặc định của Kafka connect sẽ chưa có io.confluent.connect.s3.S3SinkConnector nên ta cần tạo một folder kafka/ chưa Dockerfile để download thêm connector này cho Kafka connect

FROM debezium/connect

RUN curl -O https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.3.1/confluentinc-kafka-connect-s3-10.3.1.zip \
    && unzip confluentinc-kafka-connect-s3-10.3.1.zip \
    && mv confluentinc-kafka-connect-s3-10.3.1 /kafka/connect/ \
    && rm confluentinc-kafka-connect-s3-10.3.1.zip

Ta thêm vào docker-compose.yml với các khai báo như bên dưới

kafka-connect:
    build:
      context: ./kafka
      dockerfile: ./Dockerfile
    container_name: kafka_connect
    depends_on:
      - redpanda
    ports:
      - "8083:8083"
    env_file:
      - .env

Sau khi make build && make down && make up ta kiểm tra service Kafka connect bằng cách request đến localhost:8083/connector-plugins/

# health check Kafka connect
curl -H "Accept:application/json" localhost:8083/connector-plugins/
[
  {
    "class": "io.confluent.connect.s3.S3SinkConnector",
    "type": "sink",
    "version": "10.3.1"
  },
  {
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
    "type": "source",
    "version": "3.3.1"
  },
  {
    "class": "io.debezium.connector.db2.Db2Connector",
    "type": "source",
    "version": "2.0.1.Final"
  },
  {
    "class": "io.debezium.connector.mongodb.MongoDbConnector",
    "type": "source",
    "version": "2.0.1.Final"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "2.0.1.Final"
  },
  {
    "class": "io.debezium.connector.oracle.OracleConnector",
    "type": "source",
    "version": "2.0.1.Final"
  },
  {
    "class": "io.debezium.connector.postgresql.PostgresConnector",
    "type": "source",
    "version": "2.0.1.Final"
  },
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "2.0.1.Final"
  },
  {
    "class": "io.debezium.connector.vitess.VitessConnector",
    "type": "source",
    "version": "2.0.1.Final"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.3.1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.3.1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.3.1"
  }
]

Debezium – đọc Change Data Capture từ MySQL

Để đọc dữ liệu ra, ta chỉ cần tạo một connector cho Kafka connect. Connector này tên io.debezium.connector.mysql.MySqlConnector. Thông tin thiết lập chi tiết, các bạn có thể tìm thấy ở đây.

curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "src-brazillian-ecommerce",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.include.list": "brazillian_ecommerce",
    "topic.prefix": "dbserver1",
    "schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
    "schema.history.internal.kafka.topic": "schema-changes.brazillian_ecommerce"
  }
}'

Khi request thành công, ta quay lại Redpanda console sẽ thấy các topics liên quan đến debezium đã được tạo ra tự động.

Kiểm tra topic dbserver1.brazillian_ecommerce.olist_orders_dataset ta thấy được toàn bộ nội dung của từng message CDC như thế nào. Dựa trên những nội dung thu thập được, ta có thể replicate ra nhiều target sink khác nhau như Database PostgreSQL, Data warehouse Redshift hay Data lakehouse.

Nội dung của key
Nội dung của value

3. Target sink

MinIO – data lake lưu trữ dữ liệu phân tán

Cuối cùng, đích đến của dữ liệu sẽ là MinIO. Ta thêm vào các khai báo cho MinIO như bên dưới vào docker-compose.yml

minio:
    hostname: minio
    image: "minio/minio"
    container_name: minio
    ports:
      - "9001:9001"
      - "9000:9000"
    command: [ "server", "/data", "--console-address", ":9001" ]
    volumes:
      - ./minio/data:/data
    env_file:
      - .env

mc:
  image: minio/mc
  container_name: mc
  hostname: mc
  environment:
    - AWS_ACCESS_KEY_ID=minio
    - AWS_SECRET_ACCESS_KEY=minio123
    - AWS_REGION=us-east-1
  entrypoint: >
    /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "    
  depends_on:
    - minio

cùng với các biến môi trường được thêm vào file .env

# MinIO
MINIO_ROOT_USER="minio"
MINIO_ROOT_PASSWORD="minio123"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"

Sau khi make down && make up. Ta truy cập vào http://localhost:9001/buckets sẽ thấy giao diện trình bày như hình dưới

MinIO UI

Sink dữ liệu CDC vào MinIO

Ta dùng Kafka connect để sync trực tiếp dữ liệu về MinIO. Thông tin thiết lập chi tiết của io.confluent.connect.s3.S3SinkConnector, các bạn có thể tìm thấy ở đây.

curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "sink-s3-brazillian-ecommerce",  
  "config": {
    "topics.regex": "dbserver1.brazillian_ecommerce.*",
    "topics.dir": "brazillian_ecommerce",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "flush.size": "1000",
    "store.url": "http://minio:9000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.region": "us-east-1",
    "s3.bucket.name": "warehouse",    
    "aws.access.key.id": "minio",
    "aws.secret.access.key": "minio123"
  }
}'

Kiểm tra thông tin trên Redpanda console, ta sẽ thấy một consumer group mới được tạo ra

Khi quá trình đồng bộ dữ liệu hoàn tất, ta sẽ thấy các file đã được sync về trong MinIO

Folder lưu trữ các topics của brazillian_ecommerce
Dữ liệu được lưu dưới dạng json

4. Tương tự cho dữ liệu clickstream

Clickstream – dữ liệu user event

Ta làm tương tự bằng cách truy cập vào data_generator container thông qua make to_data_generator. Sau đó dùng python để chạy script 02_generate_clickstream.py khi thành công ta sẽ thấy logs như bên dưới.

root@e478fba851b1:/opt/src# python 02_generate_clickstream.py
0.129.2. Sent data to Redpanda topic clickstream_events: b'9272c9373449fa586ea14f425b5497a7' - b'{"timestamp": "2017-08-01", "event_name": "video_play", "event_value": 0}', sleeping for 1 second
0.130.0. Sent data to Redpanda topic clickstream_events: b'bef3b7e0d09c81ece65cc174475bb2f8' - b'{"timestamp": "2017-08-01", "event_name": "link_2_click", "event_value": 1}', sleeping for 1 second
0.130.1. Sent data to Redpanda topic clickstream_events: b'bef3b7e0d09c81ece65cc174475bb2f8' - b'{"timestamp": "2017-08-01", "event_name": "pdf_download", "event_value": 0}', sleeping for 1 second
.
.
.
.
2.147.0. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_1_click", "event_value": 1}', sleeping for 3 second
2.147.1. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_2_click", "event_value": 0}', sleeping for 3 second
2.147.2. Sent data to Redpanda topic clickstream_events: b'97b02cd154ba83c7a105d797003cbff4' - b'{"timestamp": "2017-08-03", "event_name": "link_2_click", "event_value": 1}', sleeping for 3 second

Lúc này, khi kiểm tra bằng Redpanda console, ta sẽ thấy có một topic tên clickstream_events được tạo ra, kèm theo đó là những events được capture lại trên hệ thống.

Sau cùng, để sink dữ liệu về MinIO, tạo đăng ký một connector trên Kafka connect bằng cách request như bên dưới

curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "sink-s3-clickstream",
  "config": {
    "topics": "clickstream_events",
    "topics.dir": "clickstream_events",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "key.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "s3.compression.type": "gzip",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "flush.size": "100",
    "store.url": "http://minio:9000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.region": "us-east-1",
    "s3.bucket.name": "warehouse",
    "aws.access.key.id": "minio",
    "aws.secret.access.key": "minio123"
  }
}'

Vâng, cuối cùng thì clickstream data cũng đã được sync về MinIO

Kết

Việc migrate dữ liệu từ hệ thống này sang hệ thống khác sẽ rất tốn thời gian và gây ra trì trệ cho hệ thống gốc nếu ta đi theo hướng batch ingestion. Vì mỗi lần ta cần thực hiện nhiều request đến database để lấy dữ liệu mới về. Nếu 1-2 table thì không đáng kể nhưng thực tế ta cần nhiều hơn table để có thể làm phân tích báo cáo hay phục vụ cho các Machine laerning model.

Stream data ingestion có nhiều ưu điểm hơn như giảm tải cho hệ thống nguồn vì chỉ đọc dữ liệu logs từ steam, nhờ vào những thay đổi được track thông qua CDC mà ta hoàn toàn có thể dựa vào source-of-truth này để tái tạo lại toàn bộ dữ liệu ở nhiều hệ thống khác.

Quá trình migrate dữ liệu mới chỉ là bước đầu trong công cuộc đem dữ liệu đến cho mọi nhà (data consumers) của Data Engineer. Công việc còn dang dở đó là làm sao có thể tái tạo lại từ CDC để người dùng có thể thấy được trọn vẹn dữ liệu ở đầu cuối? Những bài viết sau, tôi sẽ kể về 2 thuật toán chính thường được sử dụng để đọc dữ liệu CDC này lên như thế nào đó là Copy on writeMerge on read.

References

Trả lời

Điền thông tin vào ô dưới đây hoặc nhấn vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất /  Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất /  Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất /  Thay đổi )

Connecting to %s