Hướng dẫn deploy Spark

Spark and Scala version
Spark Shell

Việc deploy Spark là cần thiết đối với các tác vụ mang tính thường kỳ. Ví dụ, ta có thể tạo một CRON job để chương trình Spark có thể tự động tổng hợp dữ liệu cho chúng ta sau chu kỳ mỗi giờ, mỗi ngày hay mỗi tuần. spark-submit là một shell command được dùng để deploy ứng dụng Spark lên cluster. Nhờ vào cơ chế quản lý phân tán của Spark, ta không cần phải chỉnh sửa source code quá nhiều để có thể chuyển đổi từ standalone mode sang distributed mode.

Trong bài viết này, tôi sẽ hướng dẫn các bạn một số cách để deploy Spark từ dòng lệnh cơ bản cho đến việc sử dụng IDE như thế nào. Để hoàn thành được bài hướng dẫn bên dưới bạn cần đọc trước các bài viết sau:

Bạn có thể download trực tiếp source code tại Github: https://github.com/ongxuanhong/programming-with-spark.

Deploy bằng dòng lệnh cơ bản

Đầu tiên, ta viết một chương trình MapReduce đơn giản để đếm tần suất xuất hiện của các từ trong văn bản (word count). Giả sử, ta có file văn bản sau:

input.txt

Giữ Chánh Niệm
Thức dậy
Thức dậy mỉm miệng cười
Hăm bốn giờ tinh khôi
Xin nguyện sống trọn vẹn
Mắt thương nhìn cuộc đời

Quơ dép
Đặt chân trên mặt đất
Là thể hiện thần thông
Từng bước chân tỉnh thức
Làm hiển lộ pháp thân

Xuống giường
Sáng, trưa, chiều và tối
Mọi loài hãy giữ gìn
Nếu dưới chân lỡ đạp
Xin nguyện chóng siêu sinh
...

Spark word count

Ta tạo một file SparkWordCount.scala chứa nội dung như bên dưới.


import org.apache.spark._
import org.apache.spark.SparkContext

object SparkWordCount {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Word count Application").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val inputFile = sc.textFile("input.txt")
    val counts = inputFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_)

    /* saveAsTextFile method is an action that effects on the RDD */
    counts.saveAsTextFile("outfile")
    System.out.println("OK")
  }
}

Để deploy một ứng dụng Spark thông qua dòng lệnh, ta thực hiện các bước sau:

Bước 1: Download Spark Jar

Spark core jar là file cần thiết để có thể biên dịch Spark source code. Bạn có thể download spark-core_2.10-1.6.1.jar tại link sau Spark core jar. Sau đó, bạn move file jar này vào thư mục spark-application của bạn.

Bước 2: Biên dịch chương trình

Ta tiến hành biên dịch chương trình thông qua dòng lệnh bên dưới. Dòng lệnh này được thực thi ngay tại thư mục spark-application. Ở đây, $SPARK_HOME/lib/spark-assembly-1.6.1-hadoop2.6.0.jar là file lấy từ thư mục của Spark library.


scalac -classpath "lib/spark-core_2.10-1.6.1.jar:$SPARK_HOME/lib/spark-assembly-1.6.1-hadoop2.6.0.jar" \
SparkWordCount.scala \

Bước 3: Tạo file jar

Ta có thể sử dụng dòng lệnh bên dưới để tạo ra file jar. Ở đây, wordcount là tên của file jar bạn muốn biên dịch ra.


jar -cvf wordcount.jar SparkWordCount*.class \
lib/spark-core_2.10-1.6.1.jar \
$SPARK_HOME/lib/spark-assembly-1.6.1-hadoop2.6.0.jar \

Bước 4: Submit spark application

Ta có thể dùng dòng lệnh bên dưới để Submit spark application. local[2] ở đây có nghĩa là ta đang chạy Spark trên localhost với 2 thread.


spark-submit --class SparkWordCount \
--master local[2] wordcount.jar

Ngoài ra, ta có thể tổng hợp toàn bộ tiến trình trên thành một file shell (.sh) với tên là run_spark_wordcount.sh như sau:

# Compile program
scalac -classpath "lib/spark-core_2.10-1.6.1.jar:$SPARK_HOME/lib/spark-assembly-1.6.1-hadoop2.6.0.jar" \
SparkWordCount.scala \

# Create a JAR
jar -cvf wordcount.jar SparkWordCount*.class \
lib/spark-core_2.10-1.6.1.jar \
$SPARK_HOME/lib/spark-assembly-1.6.1-hadoop2.6.0.jar \

# Submit spark application
spark-submit --class SparkWordCount \
--master local[2] wordcount.jar

Ta cần gán quyền thực thi trước khi chạy được file shell này

sudo chmod 777 run_spark_wordcount.sh
./run_spark_wordcount.sh

Nếu thực hiện thành công, ta sẽ có thư mục outfile chứa thông tin wordcount như bên dưới. Ta có thể mở part-00000 hay part-00001 để quan sát kết quả.

./outfile
./outfile/._SUCCESS.crc
./outfile/.part-00000.crc
./outfile/.part-00001.crc
./outfile/_SUCCESS
./outfile/part-00000
./outfile/part-00001

Full configurations

spark-submit --class com.my.class.MyClass
--files ./test.conf
--conf 'spark.executor.extraJavaOptions=-Dconfig.fuction.conf' 
--conf 'spark.driver.extraJavaOptions=-Dconfig.file=./test.conf' 
--conf spark.executor.memory=1g
--conf spark.executor.cores=1
--conf spark.cores.max=1
--conf spark.drivers.cores=1
--master local[2]
./my_spark_assembly.jar

Deploy bằng sbt

sbt là một chương trình quản lý dự án như maven. sbt cho phép các developer có thể quản lý version, các dependencies ( các thư viện sử dụng trong dự án ), quản lý build, tự động download javadoc & source. Trên thực tế, việc import thư viện và các dependency giữa local, staging và production là rất vất vả. Khi sử dụng sbt, chương trình này sẽ hỗ trợ cho chúng ta tất cả các khâu rườm rà này. Bạn có thể download và cài đặt sbt ở đây: http://www.scala-sbt.org

Spark SQL

Trong phần ví dụ này, ta sẽ làm việc với Spark SQL. Một API của Spark giúp làm việc với dữ liệu và thực thi tiến trình MapReduce đơn giản như làm việc với SQL. Giả sử, ta có file people.json bên dưới:

people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

Ta viết một chương trình gồm các nội dung đơn giản như load dữ liệu vào Spark DataFrame, hiển thị dữ liệu, truy vấn thông tin theo cột, theo các điều kiện ràng buộc,…


import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object SparkSQLDemo {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Spark SQL Application").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.json("people.json")

    // Displays the content of the DataFrame to stdout
    df.show()
    // Print the schema in a tree format
    df.printSchema()
    // Select only the "name" column
    df.select("name").show()
    // Select everybody, but increment the age by 1
    df.select(df("name"), df("age") + 1).show()
    // Select people older than 21
    df.filter(df("age") > 21).show()
    // Count people by age
    df.groupBy("age").count().show()
  }
}

Tiếp theo ta tạo một file build.sbt chứa thông tin các file dependencies cần thiết để biên dịch chương trình như sau


name := "programming with spark"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.6.1",
  "org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)

Sau đó ta tiến hành build chương trình đơn giản bằng cách nhập lệnh sbt (để download các dependencies) và run (để chạy chương trình). Nếu thành công, ta sẽ thấy những dòng thông báo như bên dưới.

sbt
Getting org.scala-sbt sbt 0.13.11 ...
downloading https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.11/jars/sbt.jar ...
	[SUCCESSFUL ] org.scala-sbt#sbt;0.13.11!sbt.jar (7247ms)
downloading https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/main/0.13.11/jars/main.jar ...

...

> run
[info] Updating {file:/Users/hongong/Desktop/test/}test...
[info] Resolving com.sun.jersey.jersey-test-framework#jersey-test-framework-griz[info] Resolving com.fasterxml.jackson.module#jackson-module-scala_2.10;2.4.4 ..[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] downloading https://repo1.maven.org/maven2/asm/asm/3.1/asm-3.1.jar ...
[info] 	[SUCCESSFUL ] asm#asm;3.1!asm.jar (2914ms)

Trong đó, có các đoạn in ra màn hình của chương trình tương tự như sau:

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+

Để đóng gói chương trình thành file JAR, ta cần dùng thêm plug-in của sbt đó là assembly. Bạn thêm những thông tin sau để có thể đóng gói thành công chương trình của mình:


// đảm bảo download được các dependencies cần thiết, nếu có dependencies nào không tìm thấy,
// sbt sẽ tìm theo thứ tự các link resolver như bên dưới
resolvers ++= Seq(
  "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository",
  "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
  "Spray Repository" at "http://repo.spray.cc/",
  "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
  "Akka Repository" at "http://repo.akka.io/releases/",
  "Twitter4J Repository" at "http://twitter4j.org/maven2/",
  "Apache HBase" at "https://repository.apache.org/content/repositories/releases",
  "Twitter Maven Repo" at "http://maven.twttr.com/",
  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
  "Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
  "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven",
  "Sonatype OSS Releases" at "https://oss.sonatype.org/content/repositories/releases/",
  Resolver.sonatypeRepo("public")
)

// xác định cụ thể tên main class cần đóng gói
mainClass in assembly := Some("SparkMongoDemo")

// đặt tên cho file output sau khi đóng gói
assemblyJarName in assembly := "spark_mongo_demo.jar"

// merge các dependencies lại với nhau nếu có các dependencies cùng tên class
assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
  case m if m.startsWith("META-INF") => MergeStrategy.discard
  case PathList("javax", "servlet", xs@_*) => MergeStrategy.first
  case PathList("org", "apache", xs@_*) => MergeStrategy.first
  case PathList("org", "jboss", xs@_*) => MergeStrategy.first
  case "about.html" => MergeStrategy.rename
  case "reference.conf" => MergeStrategy.concat
  case "logback.xml" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

Trường hợp sử dụng các dependencies bên ngoài

Sẽ có lúc bạn gặp phải trường hợp: mặc dù đã add đầy đủ dependencies nhưng khi compile và run vẫn bị lỗi. Điều này là do repository của sbt không tồn tại các dependencies này nên không thể dowload và compile. Ta có thể download trực tiếp các file jar và bỏ vào thư mục lib. Sau đó, ta sẽ thêm các thiết lập để sbt nhận diện được các file này.

Spark MongoDB

MongoDB thường được sử dụng cho các hệ thống NoSQL nhờ vào khả năng đọc ghi dữ liệu với tốc độ cao so với các hệ cơ sở dữ liệu khác. Bạn có thể tham khảo thêm benchmark của mongodb ở đây. Tuy nhiên, mongodb cần sự hỗ trợ của Spark trong việc thực thi MapReduce để khắc phục hạn chế tính toán của mình trong phân tích Big Data.

Đầu tiên, ta import data vào mongodb

mongoimport --db my_db_test --collection my_collection_test people.json

Tiếp đến, ta viết một đoạn chương trình kết nối vào mongodb và thực hiện một vài thao tác đơn giản trên hệ cơ sở dữ liệu này.

import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkContext, SparkConf}
import org.bson.BSONObject
import com.mongodb.BasicDBObject
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import com.mongodb.hadoop.io.MongoUpdateWritable

object SparkMongoDemo extends App {

  override def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Spark MongoDB Application").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // Set up the configuration for reading from MongoDB.
    val mongoConfig = new Configuration()
    // MongoInputFormat allows us to read from a live MongoDB instance.
    // We could also use BSONFileInputFormat to read BSON snapshots.
    // MongoDB connection string naming a collection to read.
    // If using BSON, use "mapred.input.dir" to configure the directory
    // where the BSON files are located instead.
    mongoConfig.set("mongo.input.uri",
      "mongodb://localhost:27017/my_db_test.my_collection_test")

    // Create an RDD backed by the MongoDB collection.
    val documents = sc.newAPIHadoopRDD(
      mongoConfig, // Configuration
      classOf[MongoInputFormat], // InputFormat
      classOf[Object], // Key type
      classOf[BSONObject]) // Value type

    // Create a separate Configuration for saving data back to MongoDB.
    val outputConfig = new Configuration()
    outputConfig.set("mongo.output.uri", "mongodb://localhost:27017/my_db_test.my_out_collection_test")

    // We can choose to update documents in an existing collection by using the
    // MongoUpdateWritable class instead of BSONObject. First, we have to create
    // the update operations we want to perform by mapping them across our current
    // RDD.
    val updates = documents.mapValues(
      value => new MongoUpdateWritable(
        new BasicDBObject("_id", value.get("_id")), // Query
        new BasicDBObject("$set", new BasicDBObject("Michael", "Tommy")), // Update operation
        true, // Upsert
        false // Update multiple documents
      )
    )

    // Now we call saveAsNewAPIHadoopFile, using MongoUpdateWritable as the
    // value class.
    updates.saveAsNewAPIHadoopFile(
      "file:///this-is-completely-unused",
      classOf[Object],
      classOf[MongoUpdateWritable],
      classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
      outputConfig)

  }
}

Để chạy được chương trình trên, ta cần một số thư viện hỗ trợ gồm:

Các thư viện này được sbt nhận diện thông qua thiết lập unmanagedJars như bên dưới.


name := "programming with spark"

version := "1.0"

scalaVersion := "2.10.5"

unmanagedJars in Compile ++= Seq(
  file("lib/mongo-hadoop-core-1.5.2.jar"),
  file("lib/mongo-hadoop-spark-1.5.2.jar"),
  file("lib/mongo-java-driver-3.2.2.jar")
)

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.6.1",
  "org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)

Ta tiến hành build và run bình thường thông qua sbt.

Sử dụng Intellij IDEA để deploy

Để cho cuộc đời thêm đơn giản, thay vì sử dụng hàng loạt các command-line như cách trên. Bạn có thể sử dụng một IDE để hoàn tất mọi tiến trình trên chỉ qua một cú click chuột. Ở đây, tôi chọn Intellij IDEA cho môi trường lập trình của mình.

Đầu tiên, bạn cần tiến hành cài đặt scala plugin

IntelliJ IDEA Plugins
IntelliJ IDEA Plugins
Scala plugin
Scala plugin

Sau khi hoàn tất việc cài đặt, bạn tạo New project và chọn scala với sbt như hình bên dưới. Sau đó, click Next

Choose scala sbt
Choose scala sbt

Tại màn hình New Project, ta đặt tên cho project và check vào Use auto-import để sbt tự động download và import các dependencies cần thiết cho project. Sau đó, click Finish.

Project settings
Project settings

Sau khi hoàn thành các thiết lập, ta đợi một lúc cho sbt download xong plugin của scala. Lúc này, ta sẽ có được cấu trúc project như hình bên dưới.

Project first look
Project first look

Để đồng bộ hoá phiên bản Spark và Scala đã cài đặt trên local so với phiên bản quản lý bởi sbt, ta có thể chạy lệnh spark-shell để kiểm tra. Như hình dưới, ta có Spark version 1.6.1, Scala version 2.10.5.

Spark and Scala version
Spark and Scala version

Tiếp đến, ta sẽ add các dependencies như ví dụ trên và đợi sbt fetching data như hình dưới

Sbt fetching libraries
Sbt fetching libraries

Để tạo New scala script ta phải chuột vào thư mục main/scala-2.10  và đặt tên là SparkMongoDemo.

New scala script
New scala script

Ta copy source code SparkMongoDemo.scala như ví dụ trước vào scala script này.

Write scala application
Write scala application

Trước khi tiến hành chạy chương trình, ta cần thiết lập các thông số bằng cách vào Run > Edit configuration.

Edit configurations
Edit configurations

Bên góc trái trên màn hình, ta click vào dấu + để Add new configuration. Ở đây, ta chọn configuration cho Application. Ta thực hiện đặt tên cho configuration, chọn tên class chứa hàm Main và module cần chạy. Sau đó, click Ok để hoàn tất.

Add new configuration
Add new configuration
VM options: -Dspark.master=local[2] -Dspark.app.name=hello_spark -Dconfig.file=my_config.conf

Cuối cùng, ta chỉ việc click vào nút Run để tiến hành chạy chương trình. Nếu mọi việc diễn ra suôn sẻ, ta sẽ chạy thành công chương trình.

Run application
Run application

Để deploy file .jar ta thực hiện các bước sau:

  1. File > Save All.
  2. Run driver hay class có chứa hàm main.
  3. File > Project Structure.
  4. Select Tab “Artifacts”.
  5. Click nút “+” gần top cửa sổ.
  6. Select JAR từ Add drop down menu. Select “From modules with dependencies”
  7. Select main class.
  8. Radio button được select “extract to the target JAR.” Nhấn OK.
  9. Check box “Build on make”
  10. Nhấn apply sau đó OK.
  11. Từ main menu, select build dropdown.
  12. Select option build artifacts.

Kết luận

Việc sử dụng IDE sẽ hỗ trợ cho các bạn rất nhiều trong quá trình phát triển phần mềm. Mọi khâu cài đặt, download thư viện và chạy chương trình đều được thực hiện chỉ thông qua một vài cú click chuột. Tuy nhiên, bạn cần hiểu được cơ chế hoạt động của Spark cũng như cách mà một Spark job được tạo ra tuần tự như thế nào. Điều này rất hữu ích khi bạn làm việc trên môi trường không có hỗ trợ của IDE. Ví dụ như trên môi trường server, nơi mà bạn chỉ được quyền kết nối thông qua SSH và làm việc chỉ thông qua command-line. Hiểu được và sử dụng thành thạo command-line sẽ giúp quá trình compile và deploy source code lên server được thuận lợi và nhanh hơn rất nhiều.

Tham khảo thêm:

Advertisements

3 thoughts on “Hướng dẫn deploy Spark

  1. Anh oi, khi em fetching data bi canh bao nhu vay la bi gi vay anh, nho anh huong dan giup em voi.

    23:29:55 SBT project import
    [warn] Binary version (2.10) for dependency org.scala-lang#scalap;2.10.0
    [warn] in default#my-project$sources_2.11;1.0 differs from Scala binary version in project (2.11).
    [warn] Binary version (2.10) for dependency org.scala-lang#scala-compiler;2.10.0
    [warn] in default#my-project$sources_2.11;1.0 differs from Scala binary version in project (2.11).
    [warn] Binary version (2.10) for dependency org.scala-lang#scala-reflect;2.10.5
    [warn] in default#my-project$sources_2.11;1.0 differs from Scala binary version in project (2.11).
    [warn] Multiple dependencies with the same organization/name but different versions. To avoid conflict, pick one version:
    [warn] * org.scala-lang:scala-compiler:(2.10.0, 2.11.8)
    [warn] * org.scala-lang:scala-reflect:(2.10.5, 2.11.8)
    [warn] * org.apache.commons:commons-lang3:(3.3.2, 3.0)
    [warn] * jline:jline:(0.9.94, 2.12.1)
    [warn] * org.slf4j:slf4j-api:(1.7.10, 1.7.2)

    Like

Gửi phản hồi

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Log Out / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Log Out / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Log Out / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Log Out / Thay đổi )

Connecting to %s