Lập trình Spark với Scala

stackoverflow
stackoverflow

Với chế độ cài đặt Standalone, Spark có thể xử lý được một tập dữ liệu khổng lồ dù cho bạn đang sử dụng một chiếc laptop có RAM chỉ khoảng 4-8GB. Trong bài viết này, tôi sẽ hướng dẫn mọi người lập trình Spark với Scala.

Cho trước tập dữ liệu là các bài post trên stackoverflow.com, nhiệm vụ của chúng ta là xây dựng một mô hình phân lớp để dự đoán xem câu hỏi của user khi post lên thuộc nhóm nào, nhờ vậy mà trang web có thể đưa ra những tags gợi ý thông minh hơn cho user.

Đặt vấn đề

Cho tập dữ liệu post là các câu hỏi được post lên stackoverflow. Tập dữ liệu này có dung lượng khoảng 34.6GB, gồm có các thông tin sau:

  • Title – tiêu đề câu hỏi
  • Body – nội dung câu hỏi
  • Tags – danh sách các tag hay post
  • Các thuộc tính khác mà ta sẽ không đề cập

Toàn bộ tập dữ liệu có thể được tải về tại đây thông qua giao thức torrent https://archive.org/details/stackexchange. Do tập dữ liệu quá lớn, nên ta sẽ sử dụng một phiên bản nhỏ hơn (link) khoảng 130MB để chạy thực nghiệm tác vụ này.

Để đơn giản hóa bài toán, thay vì gợi ý nhiều tag cho user, ta chỉ cân nhắc xem có nên gợi ý tag “java” hay không. Nghĩa là từ bài toán multiclass-classification, ta chuyển về bài toán binary-classification.

Tiến hành cài đặt

Để cài đặt Spark, ta có thể tham khảo bài viết này. Sau khi cài đặt thành công, ta vào thư mục Spark thực hiện lệnh sau để mở Spark Shell.

bin/spark-shell

Chạy từng dòng lệnh trong Spark Shell

Đầu tiên, ta nạp các thư viện cần thiết

// General purpose library
import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline

Tiếp theo, ta sẽ parse file xml thành các phần Body, Text và Tags. Sau đó, ta gán kết quả vào một data-frame để lưu các thông tin này lại (giả sử file Posts.small.xml nằm chung thư mục cài đặt Spark).

// remove the xml header and footer
val fileName = "Posts.small.xml"
val textFile = sc.textFile(fileName)
val postsXml = textFile.map(_.trim).
                    filter(!_.startsWith("<?xml version=")).
                    filter(_ != "<posts>").
                    filter(_ != "</posts>")


// concatenate title and body and remove all unnecessary tags and new line
// characters from the body and all space duplications
val postsRDD = postsXml.map { s =>
            val xml = XML.loadString(s)

            val id = (xml \ "@Id").text
            val tags = (xml \ "@Tags").text

            val title = (xml \ "@Title").text
            val body = (xml \ "@Body").text
            val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ")
            val text = (title + " " + bodyPlain).replaceAll("\n", " ").replaceAll("( )+", " ");

            Row(id, tags, text)
        }

// create a data-frame
val schemaString = "Id Tags Text"
val schema = StructType(
      schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

val postsDf = sqlContext.createDataFrame(postsRDD, schema)

// take a look at your data frame
postsDf.show()

Để xây dựng được mô hình phân lớp, bước tiếp theo, ta sẽ chuẩn bị tập dữ liệu train (90%) và test (10%) từ tập dữ liệu đã cho.

// All rows with the “java” label should be marked as
// a "1" and rows with no "java" as a "0"
val targetTag = "java"
val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0}
val sqlfunc = udf(myudf)
val postsLabeled = postsDf.withColumn("Label", sqlfunc(col("Tags")) )

// split into negative and positive subsets by using the new label
val positive = postsLabeled.filter('Label > 0.0')
val negative = postsLabeled.filter('Label < 1.0')

// create a training dataset by sampling the positive
// and negative datasets separately
val positiveTrain = positive.sample(false, 0.9)
val negativeTrain = negative.sample(false, 0.9)
val training = positiveTrain.unionAll(negativeTrain)

// testing dataset should include all rows
// which are not included in the training datasets
val negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag)
val negativeTest = negative.join( negativeTrainTmp, negative("Id") === negativeTrainTmp("Id"), "LeftOuter").
                            filter("Flag is null").select(negative("Id"), 'Tags, 'Text, 'Label)
val positiveTrainTmp = positiveTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag)
val positiveTest = positive.join( positiveTrainTmp, positive("Id") === positiveTrainTmp("Id"), "LeftOuter").
                            filter("Flag is null").select(positive("Id"), 'Tags, 'Text, 'Label)
val testing = negativeTest.unionAll(positiveTest)

Tiếp theo, ta tiến hành xây dựng mô hình dự đoán với các thông số sau:

  • Số lượng các features
  • Các thông số cho regression
  • Gía trị epoch cho gradient decent
// creates a model based on columns from the data-frame and the training parameters
val numFeatures = 64000
val numEpochs = 30
val regParam = 0.02

val tokenizer = new Tokenizer().setInputCol("Text").setOutputCol("Words")
val hashingTF = new  org.apache.spark.ml.feature.HashingTF().setNumFeatures(numFeatures).
          setInputCol(tokenizer.getOutputCol).setOutputCol("Features")
val lr = new LogisticRegression().setMaxIter(numEpochs).setRegParam(regParam).
                                    setFeaturesCol("Features").setLabelCol("Label").
                                    setRawPredictionCol("Score").setPredictionCol("Prediction")
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(training)

Khi xây dựng mô hình, ta có thể quan sát thấy Spark sử dụng và phân phối RAM rất hợp lý cho từng tiến trình bằng cách chia thành nhiều stage nhỏ hơn, nhờ vậy mà với một chiếc laptop bạn đã có thể cài đặt được một thực nghiệm nhỏ với Big data. Nếu có nhiều máy tính hơn, ta có thể thiết lập chế độ master/slave để khai thác sức mạnh xử lý song song trên nhiều máy trạm giúp giảm thời gian xây dựng mô hình một cách đáng kể.

Sau cùng, ta tiến hành đánh giá mô hình vừa xây dựng và chạy một đoạn demo nhỏ.

// evaluate the quality of the model
val testingResult = model.transform(testing)
val testingResultScores = testingResult.select("Prediction", "Label").rdd.
                                    map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double]))
val bc = new BinaryClassificationMetrics(testingResultScores)
val roc = bc.areaUnderROC
print("Area under the ROC:" + roc)


// results
roc: Double = 0.6014178406314431
Area under the ROC:0.6014178406314431

// testing the model
val testTitle = "Easiest way to merge a release into one JAR file"
val testBoby = """Is there a tool or script which easily merges a bunch of 
                   href="http://en.wikipedia.org/wiki/JAR_%28file_format%29"
                   JAR files into one JAR file? A bonus would be to easily set the main-file manifest 
                   and make it executable. I would like to run it with something like:
                  As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy 
                  single-file tool, but the downloaded ZIP file contains a lot of libraries."""
val testText = testTitle + testBody
val testDF = sqlContext.createDataFrame(Seq( (99.0, testText))).toDF("Label", "Text")
val result = model.transform(testDF)
val prediction = result.collect()(0)(6).asInstanceOf[Double]
print("Prediction: "+ prediction)

// result
prediction: Double = 0.0
Evaluation
Evaluation

Ta nhận thấy kết quả đánh giá mô hình dự đoán theo thông số ROC (Receiver operating characteristic) chỉ đạt khoảng 60%. Với kết quả ban đầu này, ta cần cải tiến mô hình dự đoán bằng các kĩ thuật khác như TF-IDF hay normalization.

Ngoài ra, ta có thể chạy trực tiếp file script của Scala (link) bằng một trong các dòng lệnh bên dưới (lưu ý: các tập tin đều được chép vào thư mục cài đặt Spark).

// method 1: after loading into spark-shell
:load PATH_TO_FILE

// method 2: directly run spark-shell with file parameter
bin/spark-shell -i beginner_spark_ml.scala

Nguồn: http://fullstackml.com/2015/10/29/beginners-guide-apache-spark-machine-learning-scenario-with-a-large-input-dataset/

Tham khảo thêm:

Advertisement

Một suy nghĩ 10 thoughts on “Lập trình Spark với Scala

  1. Chào bạn. Cảm ơn những chia sẻ của bạn.
    Mình đã cài xong Spark theo hướng dẫn và chạy Spark-Shell theo Script của bài này.
    Tuy nhiên ở dòng lệnh val postsDf = sqlContext.createDataFrame(postsRDD, schema)
    thì xuất hiện lỗi error: not found: value sqlcontext
    Bạn có thể giúp fix nó dùm mình không
    Cảm ơn bạn!

    Thích

  2. Cam on hong.
    Minh setup lai tren Ubuntu 16. Lam cac test theo huong dan cai dat OK.
    Den dong
    val postsDf = sqlContext.createDataFrame(postsRDD, schema)
    lan nay bao loi
    aused by: java.sql.SQLException: Failed to create database ‘metastore_db’, see the next exception for details.
    Minh da chmod 777 cho usr/local/spark roi
    Hong xem giup minh nhe.
    Cam on ban

    Thích

    1. val postsDf = sqlContext.createDataFrame(postsRDD, schema)
      Anh sửa lại thành
      val postsDf = spark.sqlContext.createDataFrame(postsRDD, schema)
      thì mới chạy được nha. Do Anh cài bản spark mới nên có chút thay đổi

      Thích

  3. Chào Anh. Bài viết rất hay và chi tiết. Anh cho em hỏi là bài này là mình thao tác ở màn hình console. Còn nếu muốn viết nó thành 1 page website để người dùng có thể nhập câu hỏi vào và hệ thống recommend tag phù hợp thì làm thế nào vậy Anh và website đó nó chạy trên apache spark luôn hay là chạy trên server bình thường rồi gọi api của cái console này để truyền và lấy thông tin. Do Em mới tìm hiểu về apache spark nên chỗ này Em còn mơ hồ quá.

    Thích

    1. Hi em, sau khi training và lưu model lại, em có thể viết một API scala để thực hiện phân lớp tag từ nội dung user request đến server. Sau đó, trả kết quả về và hiển thị trên web.
      Bài viết này làm trên console. Khi lên production em phải build ra file .jar để nhận request liên tục.

      Thích

      1. Dạ. Nếu vậy thì website Em có thể viết bằng php vẫn kết nối API để chạy được đúng ko Anh. Anh có link bài viết hoặc từ khóa nào hướng dẫn viết API scala không Anh. Em đang nghiên cứu để xây dựng 1 một hệ thống truy vấn 1 khối lượng lớn file xml sử dụng apache spark mà đang bị vướn mắc chỗ này chưa giải quyết được.

        Thích

  4. Chào anh. Cảm ơn những chia sẻ của anh.
    Em gặp 1 vấn đề là:
    Em đã tải tệp dữ liệu và copy vào file cài đặt của spark nhưng mà vẫn không add data vô được ạ.
    Anh có thể hướng dẫn chỗ này dùm em được không ạ.
    Cảm ơn anh.

    Thích

Trả lời

Điền thông tin vào ô dưới đây hoặc nhấn vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất /  Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất /  Thay đổi )

Connecting to %s