Spark – Bước đầu lập trình

Working with RDD

Working with RDD

Ở bài viết trước, ta đã tìm hiểu về Spark SQL, API giúp ta có thể làm việc với MapReduce một cách dễ dàng bằng cách sử dụng ngôn ngữ SQL truyền thống. Spark SQL được kế thừa và tối ưu lại từ kiểu dữ liệu RDD. Do vậy, trong một vài trường hợp ta khó có thể sử dụng đến Spark SQL, ta vẫn có thể sử dụng các hàm trong RDD để làm việc trực tiếp với MapReduce. Qua bài viết này, tôi xin giới thiệu một vài đặc điểm thú vị của RDD các bạn cần lưu ý để có thể làm việc hiệu quả trên kiểu dữ liệu này.

Parallelized Collections

Đây là cơ chế quan trọng trong Spark. Từ một mảng dữ liệu bất kỳ, ta có thể tách mảng dữ liệu này ra thành nhiều phần (partitions/slices), sau đó phân phối (distribute) ra các máy trạm (cluster/slave/node) để chia tải, cuối cùng dữ liệu sẽ được tổng hợp lại tại máy chủ (master/driver). Thông thường ta sẽ cần từ 2-4 partitions cho mỗi CPU ở các máy trạm. Ta dùng hàm parallelize để phân phối dữ liệu.


val data = Array(1, 2, 3, 4, 5, 6)
val distData = sc.parallelize(data) // auto split data
val distData2 = sc.parallelize(data, 2) // split data into 2 partitions

Ví dụ, ta dùng hàm textFile để đọc dữ liệu từ HDFS (block size mỗi file mặc định là 64MB). Khi đó, Spark mặc định sẽ tạo ra một partition tương ứng cho từng block file lấy từ HDFS. Lưu ý, partitions >= blocks.

Spark tự động theo dõi cache được sử dụng tại mỗi node và sẽ loại bỏ dữ liệu của partition nào ít dùng nhất (LRU – least recently used). Ta có thể loại bỏ bằng tay dữ liệu RDD bằng hàm unpersist().

Broadcast variables v được tạo ra bằng cách gọi hàm SparkContext.broadcast(v). Broadcast variable là một wrapper xung quanh biến v, giá trị của biến này có thể được truy xuất thông qua hàm value.


val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

Lỗi hay mắc phải

Lẫn lộn chế độ standalone với cluster mode. Ta hãy xem ví dụ bên dưới: tính tổng các giá trị của mảng data.


var counter = 0
var rdd = sc.parallelize(data)

// Mắc lỗi tại đây (counter khác nhau giữa master và các cluster)
rdd.foreach(x => counter += x)

println("Sum value: " + counter)

Thông thường ta sẽ làm như trên nhưng bạn cần lưu ý khi chạy ở chế độ cluster, lúc này dữ liệu không còn nằm trên master nữa mà được phân phối trên các cluster. Ta cần sử dụng Accumulators như ví dụ bên dưới.


val accum = sc.accumulator(0, "My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

accum.value
res2: Int = 10

RDD Operations

RDDs hỗ trợ 2 phép toán chủ yếu: transformations, tạo ra dataset mới từ dataset ban đầu, và actions, trả dữ liệu về driver (master) program sau khi chạy một loạt các phép biến đổi trên dataset.

Tất cả transformations trong Spark đều hoạt động theo cơ chế lazy, nghĩa là các phép biến đổi này sẽ không được thực hiện cho đến khi phép toán action cần thực thi để trả về cho driver program. Nhờ vậy mà Spark chạy hiệu quả hơn, mỗi lần gọi hàm map(), Spark sẽ distribute dữ liệu ra các cluster và chỉ thực hiện actions khi cần reduce() kết quả về cho driver program. Ngoài ra, ta có thể cache dữ liệu  (trên đĩa, hay trên bộ nhớ phân tán trên nhiều nodes/cluster) cho việc truy xuất nhanh các tính toán tiếp theo.

Vậy nên chọn Storage Level nào?

Spark’s storage levels cho ta lựa chọn đánh đổi (trade-offs) giữa việc sử dụng bộ nhớ và CPU. Ta nên thử qua các option bên dưới để chọn ra được option phù hợp nhất cho hệ thống của mình.

  • Nếu dữ liệu RDDs của bạn có thể nằm gọn trong default storage level (MEMORY_ONLY), ta không cần chỉnh sử gì thêm. Option này sẽ giúp CPU hoạt động hiệu quả hơn khi có thể truy xuất dữ liệu RDD nhanh nhất có thể.
  • Tiếp đến MEMORY_ONLY_SER, SER ở đây là  serialization. Ta cần kết hợp sử dụng cùng với các library fast serialization để các object sinh ra được lưu vào bộ nhớ một cách tiết kiệm nhất nhưng vẫn cho phép truy xuất các đối tượng này có thể chấp nhận được.
  • Không nên lựa chọn cấu hình sử dụng tới ổ cứng, vì việc tính toán partition để lưu tạm dữ liệu cũng như truy xuất ổ cứng rất chậm và tốn kém, trừ khi dữ liệu của bạn quá lớn và bạn không còn lựa chọn nào khác. Bạn có thể nâng cấp RAM để hạn chế sử dụng đến ổ cứng.
  • Ta có thể sử dụng replicated storage levels nếu bạn muốn thông tin luôn sẵn sàng trường hợp một trong các cluster bị die. Tất cả storage levels đều cho phép tính toán lại dữ liệu bị mất, nhưng replicated cho phép bạn tiếp tục các tác vụ đang chạy mà không cần phải đợi để tính toán lại các partition bị mất mát dữ liệu.
  • Trong các môi trường mà việc sử dụng bộ nhớ nhiều hay nhiều ứng dụng hoạt động trên đó, thì chế độ OFF_HEAP mang lại cho bạn một số lợi ích sau:
    • Cho phép nhiều executors share cùng pool of memory trong Tachyon.
    • Giảm đáng kể chi phí garbage collection.
    • Cached data sẽ không bị mất nếu một trong các executors bị crash.

Nguồn tham khảo:

http://spark.apache.org/docs/latest/programming-guide.html

Advertisements

16 thoughts on “Spark – Bước đầu lập trình

  1. Xin chào.!
    Mình thấy những bài viết của bạn rất hay.!Mình đang tìm hiểu đến Spark Apache để áp dụng cho việc quản lý và cũng như phân tích data của mình.!Data của mình là hiện đang chứa thông tin của user của facebook và đang được lưu trữ bằng MongoDB.!Mình đang băng khoăn vài vấn đế như sau :
    1 – Lựa chọn RDD hay Data Frame?
    2 – Nếu là Data Frame thì việc kết nối với Mongo DB như thế nào?
    Xin cảm ơn.

    Số lượt thích

    • Hi,
      DataFrame (DF) là một biến thể của RDD. Về cơ bản, cả hai đều hỗ trợ MapReduce, nhưng DF cung cấp ngôn ngữ SQL like, giúp cho dev xử lý dữ liệu được dễ dàng hơn cùng với nhiều thuật toán aggregation tối ưu hơn RDD ban đầu. Spark 2.0 đã hỗ trợ kiểu dữ liệu DataSet có hiệu suất cao hơn cả DF, bạn có thể lựa chọn một trong ba kiểu dữ liệu này vì một số bài toán chỉ phù hợp khi sử dụng RDD, bài toán khác có thể sử dụng DF.
      Bạn có thể dùng mongo connector để kết nối Spark với MongoDB, nhưng tốc độ read khá chậm. Tốt nhất bạn nên export ra file .csv, .json hoặc .parquet vì Spark đọc file nhanh hơn từ database.

      Số lượt thích

      • Trước hết xin cám ơn bạn đã giải thích cụ thể cho mình hiểu.!
        Mình xin phép hỏi bạn thêm 1 vấn đề cụ thể như sau :
        1 – Data mình hiện tại có vài triệu documents nhưng lại có nhiều tiêu chí thống kê và mình muốn chia lượng công việc thống kê đó cho các spark.
        Vậy mình sẽ phải áp dụng mô hình nào? Bạn có thể nói rõ cho mình biết được không?
        2 – Mình có đọc qua bài viết này của bạn : https://ongxuanhong.wordpress.com/2016/07/06/spark-streaming-la-gi/ . Vậy nếu mình áp dụng mô hình spark streaming vào data của mình thì có ổn không?

        Xin chân thành cảm ơn bạn,

        Số lượt thích

        • Hi,
          1. Bạn có thể viết script để tổng hợp dữ liệu vào database bất kỳ, từ đó dùng một framework như website để view các biểu đồ lên cho user theo dõi.
          2. Spark streaming là một loại micro-batch processing. Bạn hoàn toàn có thể áp dụng mô hình này (ở công ty mình xử lý 1 tỉ request/ngày, tương đương khối dữ liệu 24MB/s thì Spark streaming sẽ là mô hình tốt nhất). Nhưng để đơn giản nếu dữ liệu hàng giờ không quá nhiều chừng vài trăm MB, bạn chỉ cần thực hiện CRON job để thống kê dữ liệu theo từng giờ là ok.

          Số lượt thích

  2. Chào bạn mình đang nghiên cứu về cách trainning mạng CNNS trên apache Spark bằng Framework TensorFlow . Bây giờ mình mới bắt đầu tìm hiểu nên có thử chạy ví dụ phân loại chữ viết tay MNIST trên máy ảo virtual box dạng localhost theo hướng dẫn như sau https://github.com/liangfengsid/tensoronspark.
    Mình đã cài đặt đầy đủ theo hướng dẫn nhưng mình cứ gặp lỗi không thể kết nối với SPARK. Mong bạn có thể gợi ý cho mình hướng giải quyết . Ngoài ra còn có cách nào khác để huấn luyện Deep learning trên Spark không bạn?
    Cám ơn bạn đã lắng nghe!

    Số lượt thích

  3. Chào bạn, bạn ơi cho mình hỏi là tại sao Spark lại báo là excutor hearbeats timeout 120000ms sau khi mình đưa một tập dữ liệu khoảng 3000 ảnh lên spark xử lý. Lỗi này có phải do tràn bộ nhớ không bạn. Mình đã cài đặt spark dạng cluster gồm 2 máy 1 máy là master và máy kia là worker , mỗi máy có Ram 8G và bộ nhớ là 80GB.
    Đây là cấu hình file spark-env.sh của mình như sau:
    export SPARK_WORKER_INSTANCES=1
    export SPARK_WORKER_CORES=2
    export SPARK_WORKER_MEMORY=6g
    export SPARK_EXECUTOR_MEMORY=6g
    Thân chào và chúc bạn có nhiều may mắn!

    Số lượt thích

  4. “Từ một mảng dữ liệu bất kỳ, ta có thể tách mảng dữ liệu này ra thành nhiều phần (partitions/slices), sau đó phân phối (distribute) ra các máy trạm (cluster/slave/node) để chia tải, cuối cùng dữ liệu sẽ được tổng hợp lại tại máy chủ (master/driver).”
    Giả sử trường hợp dữ liệu đầu vào là cực lớn, nếu tất cả dữ liệu sau khi xử lí xong đều dồn về Master thì liệu Master có chịu nổi hay không ? Nếu được thì bộ nhớ và dung lượng lưu trữ trên Master phải rất lớn so với các cluster ? Bạn có thể giải đáp câu hỏi này giúp mình không ạ ?

    Số lượt thích

    • Dữ liệu trả về sẽ được cho vào hàng đợi rồi MapReduce lần lượt cho đến khi hàng đợi rỗng. Hàm collect() trong Spark sẽ thực hiện quá trình này. Dĩ nhiên master phải đủ dung lượng để lưu trữ kết quả cuối cùng, nếu không sẽ phát sinh lỗi OutOfMemory.

      Số lượt thích

  5. Mình đang sử dụng Cassandra lưu dữ liêu. Mình muốn thực hiện phân tích dữ liệu này dùng spark thì bạn có thể nêu ra các bước thực hiện ko. Liệu có thể chạy Spark trực tiếp trên Cluster Cassandra đang chạy ko hay phải clone ra Cluster khác để phân tích.

    Số lượt thích

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Đăng xuất / Thay đổi )

Connecting to %s