Lập trình Spark với Scala

stackoverflow

stackoverflow

Với chế độ cài đặt Standalone, Spark có thể xử lý được một tập dữ liệu khổng lồ dù cho bạn đang sử dụng một chiếc laptop có RAM chỉ khoảng 4-8GB. Trong bài viết này, tôi sẽ hướng dẫn mọi người lập trình Spark với Scala.

Cho trước tập dữ liệu là các bài post trên stackoverflow.com, nhiệm vụ của chúng ta là xây dựng một mô hình phân lớp để dự đoán xem câu hỏi của user khi post lên thuộc nhóm nào, nhờ vậy mà trang web có thể đưa ra những tags gợi ý thông minh hơn cho user.

Đặt vấn đề

Cho tập dữ liệu post là các câu hỏi được post lên stackoverflow. Tập dữ liệu này có dung lượng khoảng 34.6GB, gồm có các thông tin sau:

  • Title – tiêu đề câu hỏi
  • Body – nội dung câu hỏi
  • Tags – danh sách các tag hay post
  • Các thuộc tính khác mà ta sẽ không đề cập

Toàn bộ tập dữ liệu có thể được tải về tại đây thông qua giao thức torrent https://archive.org/details/stackexchange. Do tập dữ liệu quá lớn, nên ta sẽ sử dụng một phiên bản nhỏ hơn (link) khoảng 130MB để chạy thực nghiệm tác vụ này.

Để đơn giản hóa bài toán, thay vì gợi ý nhiều tag cho user, ta chỉ cân nhắc xem có nên gợi ý tag “java” hay không. Nghĩa là từ bài toán multiclass-classification, ta chuyển về bài toán binary-classification.

Tiến hành cài đặt

Để cài đặt Spark, ta có thể tham khảo bài viết này. Sau khi cài đặt thành công, ta vào thư mục Spark thực hiện lệnh sau để mở Spark Shell.

bin/spark-shell

Chạy từng dòng lệnh trong Spark Shell

Đầu tiên, ta nạp các thư viện cần thiết

// General purpose library
import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline

Tiếp theo, ta sẽ parse file xml thành các phần Body, Text và Tags. Sau đó, ta gán kết quả vào một data-frame để lưu các thông tin này lại (giả sử file Posts.small.xml nằm chung thư mục cài đặt Spark).

// remove the xml header and footer
val fileName = "Posts.small.xml"
val textFile = sc.textFile(fileName)
val postsXml = textFile.map(_.trim).
                    filter(!_.startsWith("<?xml version=")).
                    filter(_ != "<posts>").
                    filter(_ != "</posts>")


// concatenate title and body and remove all unnecessary tags and new line
// characters from the body and all space duplications
val postsRDD = postsXml.map { s =>
            val xml = XML.loadString(s)

            val id = (xml \ "@Id").text
            val tags = (xml \ "@Tags").text

            val title = (xml \ "@Title").text
            val body = (xml \ "@Body").text
            val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ")
            val text = (title + " " + bodyPlain).replaceAll("\n", " ").replaceAll("( )+", " ");

            Row(id, tags, text)
        }

// create a data-frame
val schemaString = "Id Tags Text"
val schema = StructType(
      schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

val postsDf = sqlContext.createDataFrame(postsRDD, schema)

// take a look at your data frame
postsDf.show()

Để xây dựng được mô hình phân lớp, bước tiếp theo, ta sẽ chuẩn bị tập dữ liệu train (90%) và test (10%) từ tập dữ liệu đã cho.

// All rows with the “java” label should be marked as
// a "1" and rows with no "java" as a "0"
val targetTag = "java"
val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0}
val sqlfunc = udf(myudf)
val postsLabeled = postsDf.withColumn("Label", sqlfunc(col("Tags")) )

// split into negative and positive subsets by using the new label
val positive = postsLabeled.filter('Label > 0.0')
val negative = postsLabeled.filter('Label < 1.0')

// create a training dataset by sampling the positive
// and negative datasets separately
val positiveTrain = positive.sample(false, 0.9)
val negativeTrain = negative.sample(false, 0.9)
val training = positiveTrain.unionAll(negativeTrain)

// testing dataset should include all rows
// which are not included in the training datasets
val negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag)
val negativeTest = negative.join( negativeTrainTmp, negative("Id") === negativeTrainTmp("Id"), "LeftOuter").
                            filter("Flag is null").select(negative("Id"), 'Tags, 'Text, 'Label)
val positiveTrainTmp = positiveTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag)
val positiveTest = positive.join( positiveTrainTmp, positive("Id") === positiveTrainTmp("Id"), "LeftOuter").
                            filter("Flag is null").select(positive("Id"), 'Tags, 'Text, 'Label)
val testing = negativeTest.unionAll(positiveTest)

Tiếp theo, ta tiến hành xây dựng mô hình dự đoán với các thông số sau:

  • Số lượng các features
  • Các thông số cho regression
  • Gía trị epoch cho gradient decent
// creates a model based on columns from the data-frame and the training parameters
val numFeatures = 64000
val numEpochs = 30
val regParam = 0.02

val tokenizer = new Tokenizer().setInputCol("Text").setOutputCol("Words")
val hashingTF = new  org.apache.spark.ml.feature.HashingTF().setNumFeatures(numFeatures).
          setInputCol(tokenizer.getOutputCol).setOutputCol("Features")
val lr = new LogisticRegression().setMaxIter(numEpochs).setRegParam(regParam).
                                    setFeaturesCol("Features").setLabelCol("Label").
                                    setRawPredictionCol("Score").setPredictionCol("Prediction")
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(training)

Khi xây dựng mô hình, ta có thể quan sát thấy Spark sử dụng và phân phối RAM rất hợp lý cho từng tiến trình bằng cách chia thành nhiều stage nhỏ hơn, nhờ vậy mà với một chiếc laptop bạn đã có thể cài đặt được một thực nghiệm nhỏ với Big data. Nếu có nhiều máy tính hơn, ta có thể thiết lập chế độ master/slave để khai thác sức mạnh xử lý song song trên nhiều máy trạm giúp giảm thời gian xây dựng mô hình một cách đáng kể.

Sau cùng, ta tiến hành đánh giá mô hình vừa xây dựng và chạy một đoạn demo nhỏ.

// evaluate the quality of the model
val testingResult = model.transform(testing)
val testingResultScores = testingResult.select("Prediction", "Label").rdd.
                                    map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double]))
val bc = new BinaryClassificationMetrics(testingResultScores)
val roc = bc.areaUnderROC
print("Area under the ROC:" + roc)


// results
roc: Double = 0.6014178406314431
Area under the ROC:0.6014178406314431

// testing the model
val testTitle = "Easiest way to merge a release into one JAR file"
val testBoby = """Is there a tool or script which easily merges a bunch of 
                   href="http://en.wikipedia.org/wiki/JAR_%28file_format%29"
                   JAR files into one JAR file? A bonus would be to easily set the main-file manifest 
                   and make it executable. I would like to run it with something like:
                  As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy 
                  single-file tool, but the downloaded ZIP file contains a lot of libraries."""
val testText = testTitle + testBody
val testDF = sqlContext.createDataFrame(Seq( (99.0, testText))).toDF("Label", "Text")
val result = model.transform(testDF)
val prediction = result.collect()(0)(6).asInstanceOf[Double]
print("Prediction: "+ prediction)

// result
prediction: Double = 0.0
Evaluation

Evaluation

Ta nhận thấy kết quả đánh giá mô hình dự đoán theo thông số ROC (Receiver operating characteristic) chỉ đạt khoảng 60%. Với kết quả ban đầu này, ta cần cải tiến mô hình dự đoán bằng các kĩ thuật khác như TF-IDF hay normalization.

Ngoài ra, ta có thể chạy trực tiếp file script của Scala (link) bằng một trong các dòng lệnh bên dưới (lưu ý: các tập tin đều được chép vào thư mục cài đặt Spark).

// method 1: after loading into spark-shell
:load PATH_TO_FILE

// method 2: directly run spark-shell with file parameter
bin/spark-shell -i beginner_spark_ml.scala

Nguồn: http://fullstackml.com/2015/10/29/beginners-guide-apache-spark-machine-learning-scenario-with-a-large-input-dataset/

Tham khảo thêm:

Advertisements

8 thoughts on “Lập trình Spark với Scala

  1. Chào bạn. Cảm ơn những chia sẻ của bạn.
    Mình đã cài xong Spark theo hướng dẫn và chạy Spark-Shell theo Script của bài này.
    Tuy nhiên ở dòng lệnh val postsDf = sqlContext.createDataFrame(postsRDD, schema)
    thì xuất hiện lỗi error: not found: value sqlcontext
    Bạn có thể giúp fix nó dùm mình không
    Cảm ơn bạn!

    Số lượt thích

  2. Cam on hong.
    Minh setup lai tren Ubuntu 16. Lam cac test theo huong dan cai dat OK.
    Den dong
    val postsDf = sqlContext.createDataFrame(postsRDD, schema)
    lan nay bao loi
    aused by: java.sql.SQLException: Failed to create database ‘metastore_db’, see the next exception for details.
    Minh da chmod 777 cho usr/local/spark roi
    Hong xem giup minh nhe.
    Cam on ban

    Số lượt thích

  3. Chào Anh. Bài viết rất hay và chi tiết. Anh cho em hỏi là bài này là mình thao tác ở màn hình console. Còn nếu muốn viết nó thành 1 page website để người dùng có thể nhập câu hỏi vào và hệ thống recommend tag phù hợp thì làm thế nào vậy Anh và website đó nó chạy trên apache spark luôn hay là chạy trên server bình thường rồi gọi api của cái console này để truyền và lấy thông tin. Do Em mới tìm hiểu về apache spark nên chỗ này Em còn mơ hồ quá.

    Số lượt thích

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Đăng xuất / Thay đổi )

Connecting to %s