
Với chế độ cài đặt Standalone, Spark có thể xử lý được một tập dữ liệu khổng lồ dù cho bạn đang sử dụng một chiếc laptop có RAM chỉ khoảng 4-8GB. Trong bài viết này, tôi sẽ hướng dẫn mọi người lập trình Spark với Scala.
Cho trước tập dữ liệu là các bài post trên stackoverflow.com, nhiệm vụ của chúng ta là xây dựng một mô hình phân lớp để dự đoán xem câu hỏi của user khi post lên thuộc nhóm nào, nhờ vậy mà trang web có thể đưa ra những tags gợi ý thông minh hơn cho user.
Đặt vấn đề
Cho tập dữ liệu post là các câu hỏi được post lên stackoverflow. Tập dữ liệu này có dung lượng khoảng 34.6GB, gồm có các thông tin sau:
- Title – tiêu đề câu hỏi
- Body – nội dung câu hỏi
- Tags – danh sách các tag hay post
- Các thuộc tính khác mà ta sẽ không đề cập
Toàn bộ tập dữ liệu có thể được tải về tại đây thông qua giao thức torrent https://archive.org/details/stackexchange. Do tập dữ liệu quá lớn, nên ta sẽ sử dụng một phiên bản nhỏ hơn (link) khoảng 130MB để chạy thực nghiệm tác vụ này.
Để đơn giản hóa bài toán, thay vì gợi ý nhiều tag cho user, ta chỉ cân nhắc xem có nên gợi ý tag “java” hay không. Nghĩa là từ bài toán multiclass-classification, ta chuyển về bài toán binary-classification.
Tiến hành cài đặt
Để cài đặt Spark, ta có thể tham khảo bài viết này. Sau khi cài đặt thành công, ta vào thư mục Spark thực hiện lệnh sau để mở Spark Shell.
bin/spark-shell
Chạy từng dòng lệnh trong Spark Shell
Đầu tiên, ta nạp các thư viện cần thiết
// General purpose library import scala.xml._ // Spark data manipulation libraries import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ // Spark machine learning libraries import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.ml.Pipeline
Tiếp theo, ta sẽ parse file xml thành các phần Body, Text và Tags. Sau đó, ta gán kết quả vào một data-frame để lưu các thông tin này lại (giả sử file Posts.small.xml nằm chung thư mục cài đặt Spark).
// remove the xml header and footer val fileName = "Posts.small.xml" val textFile = sc.textFile(fileName) val postsXml = textFile.map(_.trim). filter(!_.startsWith("<?xml version=")). filter(_ != "<posts>"). filter(_ != "</posts>") // concatenate title and body and remove all unnecessary tags and new line // characters from the body and all space duplications val postsRDD = postsXml.map { s => val xml = XML.loadString(s) val id = (xml \ "@Id").text val tags = (xml \ "@Tags").text val title = (xml \ "@Title").text val body = (xml \ "@Body").text val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ") val text = (title + " " + bodyPlain).replaceAll("\n", " ").replaceAll("( )+", " "); Row(id, tags, text) } // create a data-frame val schemaString = "Id Tags Text" val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val postsDf = sqlContext.createDataFrame(postsRDD, schema) // take a look at your data frame postsDf.show()
Để xây dựng được mô hình phân lớp, bước tiếp theo, ta sẽ chuẩn bị tập dữ liệu train (90%) và test (10%) từ tập dữ liệu đã cho.
// All rows with the “java” label should be marked as // a "1" and rows with no "java" as a "0" val targetTag = "java" val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0} val sqlfunc = udf(myudf) val postsLabeled = postsDf.withColumn("Label", sqlfunc(col("Tags")) ) // split into negative and positive subsets by using the new label val positive = postsLabeled.filter('Label > 0.0') val negative = postsLabeled.filter('Label < 1.0') // create a training dataset by sampling the positive // and negative datasets separately val positiveTrain = positive.sample(false, 0.9) val negativeTrain = negative.sample(false, 0.9) val training = positiveTrain.unionAll(negativeTrain) // testing dataset should include all rows // which are not included in the training datasets val negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag) val negativeTest = negative.join( negativeTrainTmp, negative("Id") === negativeTrainTmp("Id"), "LeftOuter"). filter("Flag is null").select(negative("Id"), 'Tags, 'Text, 'Label) val positiveTrainTmp = positiveTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag) val positiveTest = positive.join( positiveTrainTmp, positive("Id") === positiveTrainTmp("Id"), "LeftOuter"). filter("Flag is null").select(positive("Id"), 'Tags, 'Text, 'Label) val testing = negativeTest.unionAll(positiveTest)
Tiếp theo, ta tiến hành xây dựng mô hình dự đoán với các thông số sau:
- Số lượng các features
- Các thông số cho regression
- Gía trị epoch cho gradient decent
// creates a model based on columns from the data-frame and the training parameters val numFeatures = 64000 val numEpochs = 30 val regParam = 0.02 val tokenizer = new Tokenizer().setInputCol("Text").setOutputCol("Words") val hashingTF = new org.apache.spark.ml.feature.HashingTF().setNumFeatures(numFeatures). setInputCol(tokenizer.getOutputCol).setOutputCol("Features") val lr = new LogisticRegression().setMaxIter(numEpochs).setRegParam(regParam). setFeaturesCol("Features").setLabelCol("Label"). setRawPredictionCol("Score").setPredictionCol("Prediction") val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) val model = pipeline.fit(training)
Khi xây dựng mô hình, ta có thể quan sát thấy Spark sử dụng và phân phối RAM rất hợp lý cho từng tiến trình bằng cách chia thành nhiều stage nhỏ hơn, nhờ vậy mà với một chiếc laptop bạn đã có thể cài đặt được một thực nghiệm nhỏ với Big data. Nếu có nhiều máy tính hơn, ta có thể thiết lập chế độ master/slave để khai thác sức mạnh xử lý song song trên nhiều máy trạm giúp giảm thời gian xây dựng mô hình một cách đáng kể.
Sau cùng, ta tiến hành đánh giá mô hình vừa xây dựng và chạy một đoạn demo nhỏ.
// evaluate the quality of the model val testingResult = model.transform(testing) val testingResultScores = testingResult.select("Prediction", "Label").rdd. map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double])) val bc = new BinaryClassificationMetrics(testingResultScores) val roc = bc.areaUnderROC print("Area under the ROC:" + roc) // results roc: Double = 0.6014178406314431 Area under the ROC:0.6014178406314431 // testing the model val testTitle = "Easiest way to merge a release into one JAR file" val testBoby = """Is there a tool or script which easily merges a bunch of href="http://en.wikipedia.org/wiki/JAR_%28file_format%29" JAR files into one JAR file? A bonus would be to easily set the main-file manifest and make it executable. I would like to run it with something like: As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy single-file tool, but the downloaded ZIP file contains a lot of libraries.""" val testText = testTitle + testBody val testDF = sqlContext.createDataFrame(Seq( (99.0, testText))).toDF("Label", "Text") val result = model.transform(testDF) val prediction = result.collect()(0)(6).asInstanceOf[Double] print("Prediction: "+ prediction) // result prediction: Double = 0.0

Ta nhận thấy kết quả đánh giá mô hình dự đoán theo thông số ROC (Receiver operating characteristic) chỉ đạt khoảng 60%. Với kết quả ban đầu này, ta cần cải tiến mô hình dự đoán bằng các kĩ thuật khác như TF-IDF hay normalization.
Ngoài ra, ta có thể chạy trực tiếp file script của Scala (link) bằng một trong các dòng lệnh bên dưới (lưu ý: các tập tin đều được chép vào thư mục cài đặt Spark).
// method 1: after loading into spark-shell :load PATH_TO_FILE // method 2: directly run spark-shell with file parameter bin/spark-shell -i beginner_spark_ml.scala
Tham khảo thêm:
Chào bạn. Cảm ơn những chia sẻ của bạn.
Mình đã cài xong Spark theo hướng dẫn và chạy Spark-Shell theo Script của bài này.
Tuy nhiên ở dòng lệnh val postsDf = sqlContext.createDataFrame(postsRDD, schema)
thì xuất hiện lỗi error: not found: value sqlcontext
Bạn có thể giúp fix nó dùm mình không
Cảm ơn bạn!
ThíchThích
Có thể do sqlcontext chưa được khởi tạo từ SparkContext.
val conf = new SparkConf().setAppName(“Spark SQL Application”).setMaster(“local[2]”)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json(“people.json”)
ThíchThích
Cam on hong.
Minh setup lai tren Ubuntu 16. Lam cac test theo huong dan cai dat OK.
Den dong
val postsDf = sqlContext.createDataFrame(postsRDD, schema)
lan nay bao loi
aused by: java.sql.SQLException: Failed to create database ‘metastore_db’, see the next exception for details.
Minh da chmod 777 cho usr/local/spark roi
Hong xem giup minh nhe.
Cam on ban
ThíchThích
val postsDf = sqlContext.createDataFrame(postsRDD, schema)
Anh sửa lại thành
val postsDf = spark.sqlContext.createDataFrame(postsRDD, schema)
thì mới chạy được nha. Do Anh cài bản spark mới nên có chút thay đổi
ThíchThích
Cam on nhe :))
ThíchThích
Chào Anh. Bài viết rất hay và chi tiết. Anh cho em hỏi là bài này là mình thao tác ở màn hình console. Còn nếu muốn viết nó thành 1 page website để người dùng có thể nhập câu hỏi vào và hệ thống recommend tag phù hợp thì làm thế nào vậy Anh và website đó nó chạy trên apache spark luôn hay là chạy trên server bình thường rồi gọi api của cái console này để truyền và lấy thông tin. Do Em mới tìm hiểu về apache spark nên chỗ này Em còn mơ hồ quá.
ThíchThích
Hi em, sau khi training và lưu model lại, em có thể viết một API scala để thực hiện phân lớp tag từ nội dung user request đến server. Sau đó, trả kết quả về và hiển thị trên web.
Bài viết này làm trên console. Khi lên production em phải build ra file .jar để nhận request liên tục.
ThíchThích
Dạ. Nếu vậy thì website Em có thể viết bằng php vẫn kết nối API để chạy được đúng ko Anh. Anh có link bài viết hoặc từ khóa nào hướng dẫn viết API scala không Anh. Em đang nghiên cứu để xây dựng 1 một hệ thống truy vấn 1 khối lượng lớn file xml sử dụng apache spark mà đang bị vướn mắc chỗ này chưa giải quyết được.
ThíchThích
Là API thì ngôn ngữ nào request đến cũng được. Viết scala API cũng như Java API, em tham khảo link này https://www.quora.com/Which-Scala-framework-is-the-best-for-REST-API-development
ThíchThích
Chào anh. Cảm ơn những chia sẻ của anh.
Em gặp 1 vấn đề là:
Em đã tải tệp dữ liệu và copy vào file cài đặt của spark nhưng mà vẫn không add data vô được ạ.
Anh có thể hướng dẫn chỗ này dùm em được không ạ.
Cảm ơn anh.
ThíchThích