Spark Streaming là gì

ezTrade

ezTrade

Bạn đã bao giờ quan sát một phiên giao dịch chứng khoán vào mỗi buổi sáng (dịch vụ FPT Stock). Đã bao giờ tự hỏi ứng dụng Sentiment Analysis lấy dữ liệu từ Twitter hoạt động real-time như thế nào để giúp cho các marketer hay các chính trị gia đoán biết về xu hướng quan tâm của xã hội, từ đó họ có thể đưa ra được những chiến dịch phù hợp một cách kịp thời nhất. Hay mỗi lần vào siêu thị mua đồ ăn và quan sát thấy hàng dãy quầy thu ngân bấm mã vạch để lưu thông tin hoá đơn mua hàng của hàng trăm khách hàng mỗi giờ. Và còn nữa những giao dịch mà dữ liệu luôn phát sinh theo hàng giờ, phút, giây như tin nhắn gửi về server Facebook, giao dịch ngân hàng, tuyến bay trong hàng không, real-time bidding, thông tin sensor gửi từ các thiết bị wearable hay xe hơi như Uber,…

Vậy những thông tin đổ vào như thác nước này được lưu lại như thế nào? Các hệ quản trị cơ sở dữ liệu hiện tại dường như đã quá tải. Vì vậy, cộng đồng khoa học và các kĩ sư đã cùng nhau phát triển ra các hệ thống như NoSQL (Postegre, MongoDB, Casandra, Redis,…), Message queue (Kafka, Kinesis, Flume, …) để giải quyết thử thách trên. Trong bài viết này, tôi chủ yếu tổng hợp lại những thông tin quan trọng cần nắm bắt về Spark Streaming API, một thư viện hữu ích được dùng để tích hợp trong các hệ thống xử lý streaming.

Tổng quan

Spark Streaming

Spark Streaming

Spark Streaming được mở rộng từ Spark API cho phép nhận dữ liệu từ nhiều nguồn như Kafka, Flume, Twitter, ZeroMQ, Kinesis, hay TCP sockets để xử lý tại thời gian thực (real-time), đi kèm với các hàm ETL như map, reduce, join và window. Cuối cùng, dữ liệu đầu ra có thể được lưu tại filesystems, databases, hay xuất trực tiếp trên live dashboards.

Spark Streaming cung cấp kiểu dữ liệu trừu tượng rời rạc gọi tên là discretized stream hay DStream. Kiểu dữ liệu này biểu diễn luồng dữ liệu liên tục. Bản chất DStreams chính là chuỗi các RDDs (immutable). Bạn có thể xem kiểu dữ liệu này như Array, điểm khác biệt là kiểu dữ liệu này được phân tán trên nhiều cluster. Ta có thể tương tác với DStream thông qua các hàm cung cấp bởi RDDs. Ta có thể tạo ra DStreams khác từ DStreams ban đầu.

DStreams RDDs

DStreams RDDs

DStreams words

DStreams words

Spark Streaming cung cấp hai dạng streaming sources dựng sẵn.

  • Basic sources: các nguồn dữ liệu có sẵn trong StreamingContext API. Ví dụ: file systems (HDFS, S3, NFS, …), socket connections, và Akka actors.
  • Advanced sources: các nguồn dữ liệu như Kafka, Flume, Kinesis, Twitter, … đòi hỏi ta phải cài đặt thêm một vài thư viện dependencies tương ứng.

Ví dụ cơ bản

Trong bất kì hệ thống xử lý stream, nhìn chung ta sẽ có ba bước để xử lý dữ liệu.

  • Nhận dữ liệu: dữ liệu nhận từ sources bằng cách sử dụng Receivers.
  • Transforming data: received data được transform bằng cách sử dụng DStream và RDD transformations.
  • Pushing data đầu ra: dữ liệu được transform sẽ được push ra hệ thống bên ngoài như file systems, databases, dashboards, …
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Tạo một local StreamingContext với 2 thread và batch interval 1 giây.
// Setup master 2 cores để tránh tình trạng nghẽn cổ chai,
// do một thread được dùng để đón nhận dữ liệu, một thread để xử lý dữ liệu.
// Nếu có nhiều DStream thì phải setup sao cho số core phải lớn hơn số stream đầu vào
// để có core khác dùng xử lý dữ liệu.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

// Tạo một DStream kết nối vào hostname:port, ví dụ localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// Tách mỗi dòng thành các từ
val words = lines.flatMap(_.split(" "))

// Đếm mỗi từ trong mỗi batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Xuất ra màn hình 10 từ đầu tiên trong mỗi RDD tạo ra từ DStream này.
wordCounts.print()

ssc.start()             // Bắt đầu tính toán
ssc.awaitTermination()  // Đợi đến khi chấm dứt việc tính toán

Chạy Netcat

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

Chương trình sẽ xuất ra như ví dụ bên dưới

...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

Ghi chú

  • Khi một streaming context được start không thể thêm sửa vào stream này.
  • Khi một streaming context bị stop sẽ không thể restart.
  • Chỉ một StreamingContext được active bên trong JVM tại một thời điểm.
  • Gọi lệnh stop() StreamingContext sẽ đồng thời stop SparkContext. Nếu bạn chỉ muốn stop StreamingContext thì bạn cần set parameter của stop() stopSparkContext thành false.
  • Một SparkContext có thể được sử dụng nhiều lần để tạo ra nhiều StreamingContexts khác, với điều kiện StreamingContext trước đó phải được stop (lưu ý không stop SparkContext) trước khi StreamingContext kế tiếp được tạo.
  • Các hàm thường dùng:
    • UpdateStateByKey Operation: cập nhật giá trị hiện tại thông qua trạng thái (giá trị) trước đó.
    • Transform Operation: để lọc và làm sạch dữ liệu.
    • Window Operations: sử dụng sliding window (window length, sliding interval) để tính toán và tổng hợp thành DStream mới.
    • Join Operations: dùng để nối hai stream lại với nhau.
    • getOrCreate: singleton pattern dùng để duy trì một thực thể duy nhất trong suốt quá trình thực thi.
  • Ta có thể convert RDD sang DataFrame, định nghĩa tên table và thực hiện các phép truy vấn SQL trên kiểu dữ liệu này.

Lưu lại các records


dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    // use rdd.foreachPartition - create a single connection object
    // and send all the records in a RDD partition using that connection.
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

// Further optimized
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

Streaming dstream window

Streaming dstream window

Checkpoints

  • Metadata checkpointing:
    • Configuration – lưu lại cấu hình được dùng để tạo ra streaming application.
    • DStream operations – tập hợp các phép toán trên DStream operations tạo thành streaming application.
    • Incomplete batches – các batch còn các job chưa được thực thi và còn nằm trong hàng đợi.
  • Data checkpointing: checkpoint lại dữ liệu RDD đang gửi vào ổ đĩa.
  • Khi nào sử dụng checkpoint:
    • Làm việc với stateful transformations như updateStateByKey hay reduceByKeyAndWindow, ta cần lưu lại RDD checkpoint trước đó.
    • Phục hồi lại dữ liệu bị mất trong quá trình chạy chương trình.
  • Thiết lập:
    • Lưu vào ổ đĩa HDFS, S3, … bằng hàm streamingContext.checkpoint(checkpointDirectory).
  • Cơ chế:
    • Khi chương trình được start lần đầu, nó sẽ tạo ra một StreamingContext mới, thiết lập các thông số sau đó gọi hàm start().
    • Khi chương trình restart sau khi bị failed, nó sẽ tạo ra lại một StreamingContext từ checkingpoint data lưu trong checkpoint directory.

Deploy SparkStreaming

  • Cluster với cluster managerxem thêm.
  • Đóng gói ứng dụng thành file JAR – Nếu bạn sử dụng spark-submit để start chương trình, thì không cần đóng gói Spark và Spark Streaming vào file JAR. Tuy nhiên, nếu bạn sử dụng các source nâng cao (Kafka, Flume, Twitter), bạn phải đóng gói các thư viện dependencies này lại.
  • Thiết lập đủ bộ nhớ cho các executors – vì khi đón dữ liệu Spark sẽ lưu dữ liệu trên bộ nhớ do đó ta cần thiết lập sao cho executors có đủ bộ nhớ để lưu tạm các dữ liệu này. Lưu ý, nếu bạn làm việc trên window với 10 phút interval, bạn cần setup ít nhất 10 phút dữ liệu trước đó trên bộ nhớ, như vậy bạn cần tổng cộng ít nhất 20 phút interval bộ nhớ.
  • Thiết lập checkpointing – có thể lưu trên Hadoop API (HDFS, S3, …), hay DynamoDB nếu sử dụng Kinesis.
  • Thiết lập tự động restart nếu chương trình bị failed – sử dụng cluster managers (YARN, Mesos) để theo dõi và restart SparkStreaming application.
  • Thiết lập write ahead logs – spark.streaming.receiver.writeAheadLog.enable
    • Semantics của streaming systems:
      • At most once: mỗi record sẽ được xử lý một lần hoặc không xử lý lần nào.
      • At least once: mỗi record sẽ được xử lý một hoặc nhiều lần. Cơ chế này mạnh hơn cơ chế trước để đảm bảo dữ liệu không bị mất, nhưng có thể gây ra trùng lập dữ liệu đầu ra.
      • Exactly once: mỗi record sẽ được xử lý duy nhất một lần – không có data lost và không có data nào được xử lý nhiều lần. Đây là cơ chế mạnh nhất trong nhóm.
    • Ta có thể tạo ra nhiều receivers song song để tăng throughput – xem thêm.
  • Thiết lập max receiving rate – nếu cluster resources không đủ lớn để chứa dữ liệu đổ từ streaming, ta có thể giới hạn maximum rate cho receivers theo tỉ lệ records / sec (spark.streaming.receiver.maxRate cho receivers, và spark.streaming.kafka.maxRatePerPartition cho Direct Kafka). Từ Spark 1.5, ta có thể sử dụng backpressure mà không cần thiết lập rate limit, khi đó Spark Streaming tự động điều chỉnh thông số rate limits tương ứng với điều kiện thay đổi của hệ thống (spark.streaming.backpressure.enabled = true).

Upgrading Application Code

Ta có hai cách khi muốn upgrade Spark Streaming application:

  • Cho chương trình Spark Streaming đã upgrade start và chạy song song với chương trình hiện hành. Khi chương trình mới (nhận được dữ liệu như chương trình hiện hành) được “đề-ba”, ta có thể kích hoạt chương trình mới và tăt chương trình cũ. Lưu ý, điều này chỉ có thể thực hiện cho data sources có hỗ trợ truyền dữ liệu đến hai nơi lưu trữ.
  • Tắt chương trình hiện tại bằng hàm StreamingContext.stop(…) hay JavaStreamingContext.stop(…) và đảm bảo dữ liệu nhận được đã qua xử lý trước khi shutdown. Sau đó, chạy ứng dụng đã upgraded tại thời điểm chương trình cũ được tắt. Lưu ý, cách này chỉ có thể thực hiện đối với input sources có hỗ trợ source-side buffering (như Kafka, và Flume) vì dữ liệu cần được buffered lại trong khi chương trình cũ được tắt đi trong khi chương trình mới chưa được bật lên. Sau đó, ta restart từ thông tin checkpoint trước đó. Thông tin checkpoint về cơ bản chứa các đối tượng serialized từ Scala/Java/Python và cố găng deserialize các objects này với các class mới hay các class đã modified lại có thể dẫn đến lỗi. Trong trường hợp này, hoặc là start upgraded app với checkpoint directory khác, hoặc xoá checkpoint directory trước đó.

Performance Tuning

Về tổng quát, bạn cần quan tâm hai điều bên dưới:

  • Giảm thời gian xử lý cho mỗi batch data bằng cách sử dụng hiệu quả tài nguyên của cluster – xem thêm.
    • Tạo nhiều receivers/streams chạy song song.
  • Thiết lập hợp lý batch size such sao cho batches data có thể được xử lý nhanh nhất có thể khi bắt đầu nhận dữ liệu.

Nguồn tham khảo

http://spark.apache.org/docs/latest/streaming-programming-guide.html
http://www.datanami.com/2015/11/30/spark-streaming-what-is-it-and-whos-using-it/
https://www.mapr.com/ebooks/spark/06-apache-spark-streaming-framework.html

Diving into Apache Spark Streaming’s Execution Model


https://www.oreilly.com/learning/apache-spark-2-0–introduction-to-structured-streaming

Advertisements

2 thoughts on “Spark Streaming là gì

  1. Chào thầy Hồng,
    cho em hỏi vềtính toán xong xong dùng spark.
    Em có method A() so sánh 2 cái cấu trúc hình học protein ( 2 cái 3d object) dùng clique algorithm. A() chạy nhanh, ok. Nhưng em phaỉ lặp lại cái A() cho 10000 cặp protein. Em phải implement như thế nào trên spark Java.
    Em đã dùng multithreading cỗ điển: cho 10 cái thread chạy , mỗi cái xử lý 1000 cặp trên mấy PC ở trường core i7, Ra 16G. E ko bít code này multithreading cỗ điển có chạy được trên hệ thống siêu máy tinh hiện đại, nhiều server, 1000 cores hay không.
    Cám ơn thầy nhiều

    Số lượt thích

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Đăng xuất / Thay đổi )

Connecting to %s