Hướng dẫn deploy Spark

Việc deploy Spark là cần thiết đối với các tác vụ mang tính chu kỳ. Ví dụ, ta có thể tạo một CRON job để chương trình Spark có thể tự động tổng hợp dữ liệu cho chúng ta sau chu kỳ mỗi giờ, mỗi ngày hay mỗi tuần. spark-submit là một shell command được dùng để deploy ứng dụng Spark lên cluster. Nhờ vào cơ chế quản lý phân tán của Spark, ta không cần phải chỉnh sửa source code quá nhiều để có thể chuyển đổi từ standalone mode sang distributed mode.

Để hoàn tutorial này, bạn cần các phần mềm sau:

Tiếp tục đọc

Amazon Kinesis Streams là gì

AWS Kinesis

AWS Kinesis

Thông thường, khi thiết kế một hệ thống streaming, ta nghĩ ngay đến bộ công cụ (Kafka, Storm, Redis) để nhận, xử lý, và lưu trữ dữ liệu. Tuy nhiên, khi sử dụng Kafka (một message queue giúp đảm bảo lượng dữ liệu khổng lồ ập đến không bị tắc nghẽn) bạn phải nghiên cứu khá nhiều công nghệ cũng như hiệu chỉnh các thiết lập, thậm chí phải viết code để tối ưu hoá hệ thống của mình. Tương tự như Kafka, Kinesis mặc dù có throughput (tốc độ đọc/ghi) không nhanh bằng nhưng lại giảm đáng kể công việc cho admin system cũng như các developer trong khâu bắn (put) dữ liệu và đọc dữ liệu từ stream.

Trong bài viết này, tôi sẽ trình bày một số thách thức trong xử lý stream cũng như các hướng tiếp cận để giải quyết. Đồng thời cũng giới thiệu sơ qua Kinesis, các tính năng và các thông số cơ bản của hệ thống cùng với các lệnh tương tác với Kinesis stream.

Tiếp tục đọc

Spark – Bước đầu lập trình

Working with RDD

Working with RDD

Ở bài viết trước, ta đã tìm hiểu về Spark SQL, API giúp ta có thể làm việc với MapReduce một cách dễ dàng bằng cách sử dụng ngôn ngữ SQL truyền thống. Spark SQL được kế thừa và tối ưu lại từ kiểu dữ liệu RDD. Do vậy, trong một vài trường hợp ta khó có thể sử dụng đến Spark SQL, ta vẫn có thể sử dụng các hàm trong RDD để làm việc trực tiếp với MapReduce. Qua bài viết này, tôi xin giới thiệu một vài đặc điểm thú vị của RDD các bạn cần lưu ý để có thể làm việc hiệu quả trên kiểu dữ liệu này.

Parallelized Collections

Đây là cơ chế quan trọng trong Spark. Từ một mảng dữ liệu bất kỳ, ta có thể tách mảng dữ liệu này ra thành nhiều phần (partitions/slices), sau đó phân phối (distribute) ra các máy trạm (cluster/slave/node) để chia tải, cuối cùng dữ liệu sẽ được tổng hợp lại tại máy chủ (master/driver). Thông thường ta sẽ cần từ 2-4 partitions cho mỗi CPU ở các máy trạm. Ta dùng hàm parallelize để phân phối dữ liệu.


val data = Array(1, 2, 3, 4, 5, 6)
val distData = sc.parallelize(data) // auto split data
val distData2 = sc.parallelize(data, 2) // split data into 2 partitions

Ví dụ, ta dùng hàm textFile để đọc dữ liệu từ HDFS (block size mỗi file mặc định là 64MB). Khi đó, Spark mặc định sẽ tạo ra một partition tương ứng cho từng block file lấy từ HDFS. Lưu ý, partitions >= blocks.

Spark tự động theo dõi cache được sử dụng tại mỗi node và sẽ loại bỏ dữ liệu của partition nào ít dùng nhất (LRU – least recently used). Ta có thể loại bỏ bằng tay dữ liệu RDD bằng hàm unpersist().

Broadcast variables v được tạo ra bằng cách gọi hàm SparkContext.broadcast(v). Broadcast variable là một wrapper xung quanh biến v, giá trị của biến này có thể được truy xuất thông qua hàm value.


val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

Lỗi hay mắc phải

Lẫn lộn chế độ standalone với cluster mode. Ta hãy xem ví dụ bên dưới: tính tổng các giá trị của mảng data.


var counter = 0
var rdd = sc.parallelize(data)

// Mắc lỗi tại đây (counter khác nhau giữa master và các cluster)
rdd.foreach(x => counter += x)

println("Sum value: " + counter)

Thông thường ta sẽ làm như trên nhưng bạn cần lưu ý khi chạy ở chế độ cluster, lúc này dữ liệu không còn nằm trên master nữa mà được phân phối trên các cluster. Ta cần sử dụng Accumulators như ví dụ bên dưới.


val accum = sc.accumulator(0, "My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

accum.value
res2: Int = 10

RDD Operations

RDDs hỗ trợ 2 phép toán chủ yếu: transformations, tạo ra dataset mới từ dataset ban đầu, và actions, trả dữ liệu về driver (master) program sau khi chạy một loạt các phép biến đổi trên dataset.

Tất cả transformations trong Spark đều hoạt động theo cơ chế lazy, nghĩa là các phép biến đổi này sẽ không được thực hiện cho đến khi phép toán action cần thực thi để trả về cho driver program. Nhờ vậy mà Spark chạy hiệu quả hơn, mỗi lần gọi hàm map(), Spark sẽ distribute dữ liệu ra các cluster và chỉ thực hiện actions khi cần reduce() kết quả về cho driver program. Ngoài ra, ta có thể cache dữ liệu  (trên đĩa, hay trên bộ nhớ phân tán trên nhiều nodes/cluster) cho việc truy xuất nhanh các tính toán tiếp theo.

Vậy nên chọn Storage Level nào?

Spark’s storage levels cho ta lựa chọn đánh đổi (trade-offs) giữa việc sử dụng bộ nhớ và CPU. Ta nên thử qua các option bên dưới để chọn ra được option phù hợp nhất cho hệ thống của mình.

  • Nếu dữ liệu RDDs của bạn có thể nằm gọn trong default storage level (MEMORY_ONLY), ta không cần chỉnh sử gì thêm. Option này sẽ giúp CPU hoạt động hiệu quả hơn khi có thể truy xuất dữ liệu RDD nhanh nhất có thể.
  • Tiếp đến MEMORY_ONLY_SER, SER ở đây là  serialization. Ta cần kết hợp sử dụng cùng với các library fast serialization để các object sinh ra được lưu vào bộ nhớ một cách tiết kiệm nhất nhưng vẫn cho phép truy xuất các đối tượng này có thể chấp nhận được.
  • Không nên lựa chọn cấu hình sử dụng tới ổ cứng, vì việc tính toán partition để lưu tạm dữ liệu cũng như truy xuất ổ cứng rất chậm và tốn kém, trừ khi dữ liệu của bạn quá lớn và bạn không còn lựa chọn nào khác. Bạn có thể nâng cấp RAM để hạn chế sử dụng đến ổ cứng.
  • Ta có thể sử dụng replicated storage levels nếu bạn muốn thông tin luôn sẵn sàng trường hợp một trong các cluster bị die. Tất cả storage levels đều cho phép tính toán lại dữ liệu bị mất, nhưng replicated cho phép bạn tiếp tục các tác vụ đang chạy mà không cần phải đợi để tính toán lại các partition bị mất mát dữ liệu.
  • Trong các môi trường mà việc sử dụng bộ nhớ nhiều hay nhiều ứng dụng hoạt động trên đó, thì chế độ OFF_HEAP mang lại cho bạn một số lợi ích sau:
    • Cho phép nhiều executors share cùng pool of memory trong Tachyon.
    • Giảm đáng kể chi phí garbage collection.
    • Cached data sẽ không bị mất nếu một trong các executors bị crash.

Nguồn tham khảo:

http://spark.apache.org/docs/latest/programming-guide.html

Spark Streaming là gì

ezTrade

ezTrade

Bạn đã bao giờ quan sát một phiên giao dịch chứng khoán vào mỗi buổi sáng (dịch vụ FPT Stock). Đã bao giờ tự hỏi ứng dụng Sentiment Analysis lấy dữ liệu từ Twitter hoạt động real-time như thế nào để giúp cho các marketer hay các chính trị gia đoán biết về xu hướng quan tâm của xã hội, từ đó họ có thể đưa ra được những chiến dịch phù hợp một cách kịp thời nhất. Hay mỗi lần vào siêu thị mua đồ ăn và quan sát thấy hàng dãy quầy thu ngân bấm mã vạch để lưu thông tin hoá đơn mua hàng của hàng trăm khách hàng mỗi giờ. Và còn nữa những giao dịch mà dữ liệu luôn phát sinh theo hàng giờ, phút, giây như tin nhắn gửi về server Facebook, giao dịch ngân hàng, tuyến bay trong hàng không, real-time bidding, thông tin sensor gửi từ các thiết bị wearable hay xe hơi như Uber,…

Vậy những thông tin đổ vào như thác nước này được lưu lại như thế nào? Các hệ quản trị cơ sở dữ liệu hiện tại dường như đã quá tải. Vì vậy, cộng đồng khoa học và các kĩ sư đã cùng nhau phát triển ra các hệ thống như NoSQL (Postegre, MongoDB, Casandra, Redis,…), Message queue (Kafka, Kinesis, Flume, …) để giải quyết thử thách trên. Trong bài viết này, tôi chủ yếu tổng hợp lại những thông tin quan trọng cần nắm bắt về Spark Streaming API, một thư viện hữu ích được dùng để tích hợp trong các hệ thống xử lý streaming.

Tiếp tục đọc

Làm việc với Spark DataFrames – Truy vấn cơ bản

Spark DataFrames

Spark DataFrames

DataFrame là một kiểu dữ liệu collection phân tán, được tổ chức thành các cột được đặt tên. Về mặt khái niệm, nó tương đương với các bảng quan hệ (relational tables) đi kèm với các kỹ thuật tối ưu tính toán.

DataFrame có thể được xây dựng từ nhiều nguồn dữ liệu khác nhau như Hive table, các file dữ liệu có cấu trúc hay bán cấu trúc (csv, json), các hệ cơ sở dữ liệu phổ biến (MySQL, MongoDB, Cassandra), hoặc RDDs hiện hành. API này được thiết kế cho các ứng dụng Big Data và Data Science hiện đại. Kiểu dữ liệu này được lấy cảm hứng từ DataFrame trong Lập trình R và Pandas trong Python hứa hẹn mang lại hiệu suất tính toán cao hơn.

Spark DataFrames Performance

Spark DataFrames Performance

Trong bài viết này, tôi sẽ tiến hành thực nghiệm một vài truy vấn cơ bản để làm quen với kiểu dữ liệu DataFrames. Ở các bài viết sau, ta sẽ đi sâu hơn vào các truy vấn nâng cao và phức tạp hơn.

Tiếp tục đọc