Amazon Kinesis Streams là gì

AWS Kinesis

AWS Kinesis

Thông thường, khi thiết kế một hệ thống streaming, ta nghĩ ngay đến bộ công cụ (Kafka, Storm, Redis) để nhận, xử lý, và lưu trữ dữ liệu. Tuy nhiên, khi sử dụng Kafka (một message queue giúp đảm bảo lượng dữ liệu khổng lồ ập đến không bị tắc nghẽn) bạn phải nghiên cứu khá nhiều công nghệ cũng như hiệu chỉnh các thiết lập, thậm chí phải viết code để tối ưu hoá hệ thống của mình. Tương tự như Kafka, Kinesis mặc dù có throughput (tốc độ đọc/ghi) không nhanh bằng nhưng lại giảm đáng kể công việc cho admin system cũng như các developer trong khâu bắn (put) dữ liệu và đọc dữ liệu từ stream.

Trong bài viết này, tôi sẽ trình bày một số thách thức trong xử lý stream cũng như các hướng tiếp cận để giải quyết. Đồng thời cũng giới thiệu sơ qua Kinesis, các tính năng và các thông số cơ bản của hệ thống cùng với các lệnh tương tác với Kinesis stream.

Những thách thức khi xử lý stream

Throughput (tốc độ đọc/ghi)

Giả sử bạn có một gói tin clickstream như bên dưới. Gói tin này có độ lớn là 342 bytes, ta giả sử hầu hết các gói tin có độ lớn trung bình là 370 bytes.


{
    "sessionId": "seXACNTS3FoQuqTVxAM",
    "fmt": "1",
    "num": "1",
    "cv": "7",
    "frm": "2",
    "url": "http%3A//www.mywebsite.com/activityi%3Bsrc%3D1782317%3Btype%3D2015s004%3Bcat%3Dgs6pr0%3Bord%3D3497210042551.1597%3F",
    "ref": "http%3A//www.mywebsite.com/us/explore/myproduct-features-and-specs/%3Fcid%3Dppc-",
    "random": "3833153354"
}

Hãy tưởng tượng website của bạn vừa mới được triển khai, traffic lúc này không cao lắm, chỉ khoảng 50 click event mỗi giây. Với hướng tiếp cận đơn giản, bạn sẽ tuần tự đẩy gói tin này vào hệ thống của mình và tuần tự lưu dữ liệu này xuống. Sau khi benchmark, hệ thống chịu tải khoảng 90 gói tin mỗi giây (records per second – RPS), so với 50 thì như vậy đã quá đủ.

Tuy nhiên, trong tương lai, website của bạn sẽ bắt đầu được nhiều người biết đến. Lúc này, hệ thống nhận liên tục 5000 click event mỗi giây thay vì 50 như trước. Trong khi đó, hệ thống của bạn chỉ chịu tải được 90 RPS, vậy bạn sẽ xử lý như thế nào để tránh tắc nghẽn hệ thống?

Multithreading
Đây là hướng tiếp cận truy xuất đồng thời dữ liệu đầu vào bằng cách tạo ra nhiều threads. Nếu ta cần đáp ứng 5000 RPS với mỗi thread chịu tải được 90 RPS thì ta cần ít nhất 56 threads để thực hiện tác vụ trên.

Điểm bất lợi ở hướng tiếp cận này là hệ thống sẽ sử dụng CPU quá nhiều, dẫn đến quá tải khi switching qua lại giữa các threads. Hơn nữa, nó còn làm nghẽn băng thông khi thực hiện quá nhiều HTTP request. Và đây cũng chính là thách thức tiếp theo.

Bandwidth Overhead

HTTP request

HTTP request

Mặc dù gói tin của bạn ban đầu chỉ khoảng 370 bytes, nhưng khi truyền tải thông qua HTTP request, dữ liệu sẽ lên đến 1200 bytes. Do mỗi gói tin sẽ chứa thông tin HTTP headers, thông tin này chiếm dụng dung lượng khá lớn, gần bằng kích thức của một record. Nếu ta gửi một record thông qua một HTTP request, thì kích thước này sẽ tăng lên, chưa kể một số gói tin sử dụng base64-encoding lại càng chiếm nhiều băng thông. Lúc này, bạn sẽ cần gộp chung (aggregate) các records lại với nhau thành một gói tin duy nhất để truyền đi một HTTP request, như vậy sẽ tiết kiệm được băng thông.

Retry khi dữ liệu bị lỗi hoặc bị mất

Các trường hợp lỗi thường gặp:

  • Dữ liệu bị nhiễu trong quá trình đóng gói.
  • Một trong các server bị down.
  • Ứng dụng đọc dữ liệu từ stream bị down.

Trong trường hợp này, bạn có 2 hướng giải quyết. Một là lập trình một ứng dụng để phát hiện những gói tin bị lỗi, từ đó gọi một request để bắn lại gói tin. Hai là sử dụng checkpoint khi một trong các server hay ứng dụng bị down, từ đó ta biết được vị trí để có thể tiếp tục đọc tiếp sau này.

Kinesis

Thay vì lập trình toàn bộ ứng dụng để quản lý các vấn đề trên, ta có thể dùng Kinesis Producer Library (KPL) được cung cấp bởi Amazon Web Service. Với thư viện này, ta đạt mốc 76,000 RPS so với 90 RPS, nhiều hơn gấp 800 lần, một con số đáng kể. Kinesis làm được điều này là nhờ áp dụng một số phương pháp tối ưu khá thông minh:

  • Nén các records lại (khoảng 1000 records) và truyền đi một HTTP request duy nhất.
  • Sử dụng checkpoint để quản lý tiến trình đọc dữ liệu từ stream, nhờ vậy ta có thể lấy lại dữ liệu khi phát hiện dữ liệu bị mất hay bị lỗi.
  • Class Retrier hỗ trợ phát hiện gói tin bị lỗi và phân biệt được lỗi này có thể retry hay không để tiết kiệm chi phí request.
  • Sử dụng CloudWatch để gửi các thông tin cần thiết cho ứng dụng nhằm mục đích phát hiện các thời điểm bất thường từ đó người quản trị có thể khắc phục kịp thời.

Khi làm việc với Kinesis stream

Một stream thường có từ một đến nhiều shards, mỗi shard hỗ trợ cao nhất là 5 transactions mỗi giây (read) tương đương với 2MB mỗi giây và 1,000 records mỗi giây tương đương với 1MB mỗi giây (write – đã bao gồm partition key).

Spark Kinesis 1

Spark Kinesis 1

Spark Kinesis 2

Spark Kinesis 2

Với cơ chế quản lý theo shard, ta dễ dàng thêm bớt khi data rate tăng hoặc giảm (dynamic resource allocation).

Để làm việc với Kinesis, bạn có thể sử dụng các Stream API khác, tuy nhiên KCL (Kinesis Consumer Library) sẽ cung cấp cho bạn nhiều giao tiếp dễ dàng đối với các tác vụ phức tạp (tính toán phân tán, load-balancing trên nhiều instances, giải quyết vấn đề failures của các instances, xử lý checkpoint records, resharding, …). KCL giúp bạn tập trung hơn vào xử lý ghi dữ liệu.

Bạn có thể tham khảo mã nguồn ở đây: https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis

Ngoài ra, nếu bạn nằm trong vùng hỗ trợ của dịch vụ Firehose, bạn hầu như không cần phải viết nhiều code để quản lý stream, bạn có thể dễ dàng quản lý resources của mình một cách đơn giản chỉ thông qua một vài cú click.

Thao tác cơ bản

Tạo một stream


aws kinesis create-stream –stream-name <your_stream_name> –shard-count <number of shard>

aws kinesis create-stream –stream-name lam_viec_voi_kinesis –shard-count 1

Quan sát trạng thái một stream


aws kinesis describe-stream –stream-name <your_stream_name>

{

“ShardId”: “shardId-000000000000”,

“HashKeyRange”: {

“EndingHashKey”: “340282366920938463463374607431768211455”,

“StartingHashKey”: “0”

},

“SequenceNumberRange”: {

“StartingSequenceNumber”: “49563459445425426712014046888996216399487771995315109890”

}

Liệt kê danh sách các stream


aws kinesis list-streams

Put một record


aws kinesis put-record –stream-name <your_stream_name> –partition-key <random number> –data <your_data_>

“SequenceNumber”: “49563459445425426712014046889339551332258349083481079810”

Get một record


aws kinesis get-shard-iterator –shard-id shardId-000000000000 –shard-iterator-type <[TRIM_HORIZON, LATEST]> –stream-name <your_stream_name> [–query ‘ShardIterator’]

{

“ShardIterator”: “AAAAAAAAAAGPc4d+V6UgVwDc/AbWDxCqbh1JZAnvfdzIOE+7znqidP2o/o+yT5/pWxqZvqGpJrrsH9wKmhtvgzouWNoFzaEf/R/ztPOHAdLMjBvgJHSaRGQUZsYBl2HkZ5SLyejWUpTIdb8pAxptW9QnZ7p1AyR2OcddEeQrC2z2tdE6xdRxh3glcALvZ8sCt6zKNCS8XMmaYVRQnHbocx7GwWN0+s3+vIXwIZTfVRjlViWXJMY6Dw==”

}

Shard iterators tồn tại trong 300 giây (5 phút), để phục hồi ta dùng lệnh get-shard-iterator. Ta có thể phải mất nhiều request đến server để lấy được shard có chứa records.


aws kinesis get-records –shard-iterator AAAAAAAAAAGPc4d+V6UgVwDc/AbWDxCqbh1JZAnvfdzIOE+7znqidP2o/o+yT5/pWxqZvqGpJrrsH9wKmhtvgzouWNoFzaEf/R/ztPOHAdLMjBvgJHSaRGQUZsYBl2HkZ5SLyejWUpTIdb8pAxptW9QnZ7p1AyR2OcddEeQrC2z2tdE6xdRxh3glcALvZ8sCt6zKNCS8XMmaYVRQnHbocx7GwWN0+s3+vIXwIZTfVRjlViWXJMY6Dw==

Tập lệnh quan sát dữ liệu tại mỗi shard iterator


SHARD_ITERATOR=$(aws kinesis get-shard-iterator –shard-id shardId-000000000000 –shard-iterator-type TRIM_HORIZON –stream-name <your_stream_name> –query ‘ShardIterator’); aws kinesis get-records –shard-iterator $SHARD_ITERATOR [| grep Data | wc -l; done]

Trong mỗi tập records sẽ có thông tin NextShardIterator (để nhảy đến tập records tiếp theo) và MillisBehindLatest (cho biết độ trễ giữa việc đọc dữ liệu so với dữ liệu hiện có trong stream – 0 có nghĩa là đang bắt kịp). Các records này sẽ tồn tại 24 giờ cho phép bạn lấy thông tin và lưu trữ lại trước khi bị biến mất (retention period – có thể setup đến 168 giờ ~ 7 ngày).

Xoá stream


aws kinesis delete-stream –stream-name <your_stream_name>

aws kinesis describe-stream –stream-name <your_stream_name>

Nếu đã được delete thì ta sẽ nhận được thông báo lỗi sau

A client error (ResourceNotFoundException) occurred when calling the DescribeStream operation:

Stream Foo under account 112233445566 not found.

Spark kinesis

Để đọc dữ liệu từ Kinesis stream, ta có thể kết hợp sử dụng với Spark Kinesis Streaming library. Khi tạo hoặc gọi hàm KinesisUtils.createStream có ba thông số cần quan tâm:

  • Initial position – vị trí chương trình của bạn bắt đầu đọc từ stream: đọc record cũ nhất tới giờ (InitialPositionInStream.TRIM_HORIZON) hoặc đọc record mới nhất tới giờ (InitialPostitionInStream.LATEST).
  • Application name – tên của DynamoDB table nơi thông tin về checkpoints được lưu lại.
  • Checkpoint interval – Khoảng thời gian checkpoint lưu xuống DynamoDB, tương đương với chuỗi dữ liệu sau cùng đọc từ tất cả shards được ghi vào DynamoDB table. Ta có thể view DynamoDB table chứa records cho từng shard và thông tin tương ứng với nó.

Những điểm cần lưu ý lúc runtime:

  • Kinesis data processing được sắp xếp theo từng partition và xuất hiện ít nhất một lần trong mỗi message.
  • Nhiều ứng dụng khác nhau có thể đọc cùng lúc từ cùng Kinesis stream. Kinesis sẽ duy trì ứng dụng cụ thể dựa trên shard và checkpoint info trong DynamoDB.
  • Một Kinesis stream shard được xử lý bởi một input DStream tại một thời điểm.
  • Một Kinesis input DStream có thể được đọc từ nhiều shards của một Kinesis stream bằng cách tạo ra nhiều KinesisRecordProcessor threads.
  • Nhiều input DStreams chạy trên các instance và xử lý độc lập có thể đọc từ một Kinesis stream
  • Bạn không cần Kinesis input DStreams nhiều hơn số lượng Kinesis stream shards do mỗi input DStream sẽ tạo ra ít nhất một KinesisRecordProcessor thread để quản lý từng shard
  • Horizontal scaling bằng cách thêm/bớt Kinesis input DStreams (bên trong một process hoặc toàn bộ nhiều processes/instances) – tương ứng với tổng số Kinesis stream shards so với thời điểm trước
  • Kinesis input DStream sẽ cân bằng tải giữa tất cả DStreams ngay cả nhiều processes/instances cùng lúc
  • Kinesis input DStream sẽ cân bằng tải trong suốt quá trình re-shard (merging, splitting) để điều chỉnh tải. Lưu ý, không nên re-sharding nhiều lần.
  • Mỗi Kinesis input DStream quản lý checkpoint info của riêng nó.
  • Không có sự liên hệ giữa số lượng Kinesis stream shards và số lượng RDD partitions/shards tạo ra trên các cluster của Spark trong suốt quá trình xử lý DStream.

Nguồn tham khảo

Projects

Advertisements

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Đăng xuất / Thay đổi )

Connecting to %s