Làm việc với Spark DataFrames – Truy vấn cơ bản

Spark DataFrames

Spark DataFrames

DataFrame là một kiểu dữ liệu collection phân tán, được tổ chức thành các cột được đặt tên. Về mặt khái niệm, nó tương đương với các bảng quan hệ (relational tables) đi kèm với các kỹ thuật tối ưu tính toán.

DataFrame có thể được xây dựng từ nhiều nguồn dữ liệu khác nhau như Hive table, các file dữ liệu có cấu trúc hay bán cấu trúc (csv, json), các hệ cơ sở dữ liệu phổ biến (MySQL, MongoDB, Cassandra), hoặc RDDs hiện hành. API này được thiết kế cho các ứng dụng Big Data và Data Science hiện đại. Kiểu dữ liệu này được lấy cảm hứng từ DataFrame trong Lập trình R và Pandas trong Python hứa hẹn mang lại hiệu suất tính toán cao hơn.

Spark DataFrames Performance

Spark DataFrames Performance

Trong bài viết này, tôi sẽ tiến hành thực nghiệm một vài truy vấn cơ bản để làm quen với kiểu dữ liệu DataFrames. Ở các bài viết sau, ta sẽ đi sâu hơn vào các truy vấn nâng cao và phức tạp hơn.

Chuẩn bị

Cài đặt môi trường lập trình

Bạn có thể sử dụng một trong cách cách sau để deploy một ứng dụng Spark

  • Command line
  • SBT
  • IntelliJ IDEA

Cơ sở dữ liệu

Để đơn giản, tôi sẽ lấy dữ liệu từ www.w3schools.com và tổng hợp thành các file *.csv để dễ dàng nạp dữ liệu đầu vào cho chương trình minh hoạ. Bạn có thể download trực tiếp các file này ở đây.

Thiết lập cho file build.sbt

Ở đây, tôi sẽ quản lý các dependencies thông qua sbt. Bên dưới là các dependencies các bạn cần để tiến hành các thực nghiệm truy vấn dữ liệu.

name := "spark-dataframes"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % "1.6.1",
  "org.apache.spark" % "spark-sql_2.10" % "1.6.1",
  "com.databricks" % "spark-csv_2.10" % "1.4.0"
)

Chạy một truy vấn đơn giản

Đây là một đoạn code scala đơn giản. Công việc chính là đọc vào file customer.csv sau đó lưu vào DataFrames df để hiển thị. Hàm registerTempTable(“Customers”) dùng để định danh cho sqlContext biết tên của bảng dữ liệu bạn đang làm việc. Toàn bộ code, bạn có thể sao chép từ Github.

import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object MainExecutor {
  def main(args: Array[String]) {

    val conf = new SparkConf()
      .setAppName("Spark DataFrames Application")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // Loading customers data
    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true") // Use first line of all files as header
      .option("inferSchema", "true") // Automatically infer data types
      .load("db/csv/customers.csv")

    // Let sqlContext know which table you gonna work with
    df.registerTempTable("Customers")
    df.show(5)
  }
}

Nếu thành công, bạn sẽ nhận được output như bên dưới.

+----------+--------------------+------------------+--------------------+-----------+----------+-------+
|CustomerID|        CustomerName|       ContactName|             Address|       City|PostalCode|Country|
+----------+--------------------+------------------+--------------------+-----------+----------+-------+
|         1| Alfreds Futterkiste|      Maria Anders|       Obere Str. 57|     Berlin|     12209|Germany|
|         2|Ana Trujillo Empa...|      Ana Trujillo|Avda. de la Const...|México D.F.|      5021| Mexico|
|         3|Antonio Moreno Ta...|    Antonio Moreno|      Mataderos 2312|México D.F.|      5023| Mexico|
|         4|     Around the Horn|      Thomas Hardy|     120 Hanover Sq.|     London|   WA1 1DP|     UK|
|         5|  Berglunds snabbköp|Christina Berglund|      Berguvsvägen 8|      Luleå|  S-958 22| Sweden|
+----------+--------------------+------------------+--------------------+-----------+----------+-------+
only showing top 5 rows

Ở các phần còn lại, ta sẽ tiến hành các truy vấn cơ bản trên bảng dữ liệu Customers.

Câu lệnh SELECT

Bạn có 2 lựa chọn để có thể truy vấn dữ liệu

  1. Sử dụng sqlContext với hàm sql(): bạn sẽ cảm thấy thân thuộc ngay vì đây hoàn toàn là các câu lệnh SQL bạn từng làm việc.
  2. Sử dụng DataFrames: kiểu dữ liệu DataFrames cung cấp cho các bạn đầy đủ các hàm tương tự như SQL (SQL-like)  để bạn dễ dàng truy vấn.

Tuỳ theo thói quen và sở thích, bạn có thể sử dụng một trong 2 cách tiếp cận này. Điều quan trọng là kết quả xuất ra đúng như mong đợi.

// SELECT by sqlContext
sqlContext.sql("SELECT CustomerName,City FROM Customers").show(5)
// SELECT by DataFrames
df.select("CustomerName", "City").show(5)

// Output
+--------------------+-----------+
|        CustomerName|       City|
+--------------------+-----------+
| Alfreds Futterkiste|     Berlin|
|Ana Trujillo Empa...|México D.F.|
|Antonio Moreno Ta...|México D.F.|
|     Around the Horn|     London|
|  Berglunds snabbköp|      Luleå|
+--------------------+-----------+
only showing top 5 rows

SQL SELECT DISTINCT Statement

// SELECT DISTINCT by sqlContext
sqlContext.sql("SELECT DISTINCT City FROM Customers").show(5)
// SELECT DISTINCT by DataFrames
df.select("City").distinct().show(5)

// Output
+--------+
|    City|
+--------+
|  Aachen|
|    Bern|
|  Nantes|
|Toulouse|
|   Walla|
+--------+
only showing top 5 rows

SQL WHERE Clause

Scala còn cung cấp cho bạn lựa chọn viết code SQL trên nhiều dòng bằng cách sử dụng dấu nháy 3 “””_”””. Thêm vào đó, bạn còn có thể lựa chọn phong cách viết code theo trường phái Scala hay Java như ví dụ minh hoạ bên dưới. Tôi sẽ phân biệt hai trường phái này thông qua hai hàm where (scala) và filter (java).

// WHERE clause
import org.apache.spark.sql.functions._
sqlContext.sql(
  """
    SELECT * FROM Customers
    WHERE Country='Mexico'
  """).show(5)
// Scala style
df.where("Country='Mexico'").show(5)
// Java style
df.filter(col("Country") === "Mexico").show(5)

// Output
+----------+--------------------+--------------------+--------------------+-----------+----------+-------+
|CustomerID|        CustomerName|         ContactName|             Address|       City|PostalCode|Country|
+----------+--------------------+--------------------+--------------------+-----------+----------+-------+
|         2|Ana Trujillo Empa...|        Ana Trujillo|Avda. de la Const...|México D.F.|      5021| Mexico|
|         3|Antonio Moreno Ta...|      Antonio Moreno|      Mataderos 2312|México D.F.|      5023| Mexico|
|        13|Centro comercial ...|     Francisco Chang|Sierras de Granad...|México D.F.|      5022| Mexico|
|        58|Pericles Comidas ...| Guillermo Fernández|Calle Dr. Jorge C...|México D.F.|      5033| Mexico|
|        80| Tortuga Restaurante|Miguel Angel Paolino|    Avda. Azteca 123|México D.F.|      5033| Mexico|
+----------+--------------------+--------------------+--------------------+-----------+----------+-------+

Operators in The WHERE Clause

Tiếp theo, ta sẽ điểm qua các toán tử cơ bản thường đi cùng với câu lệnh WHERE. Đầu tiên, với điều kiện nhỏ hơn, ta sẽ tìm tất cả các customer nào có ID < 5.

// Operators in The WHERE Clause
sqlContext.sql(
  """
    SELECT * FROM Customers
    WHERE CustomerID < 5
  """).show()
df.where("CustomerID < 5").show()
df.filter(col("CustomerID").lt(5)).show()
df.filter(col("CustomerID") < 5).show()

// Output
+----------+--------------------+--------------+--------------------+-----------+----------+-------+
|CustomerID|        CustomerName|   ContactName|             Address|       City|PostalCode|Country|
+----------+--------------------+--------------+--------------------+-----------+----------+-------+
|         1| Alfreds Futterkiste|  Maria Anders|       Obere Str. 57|     Berlin|     12209|Germany|
|         2|Ana Trujillo Empa...|  Ana Trujillo|Avda. de la Const...|México D.F.|      5021| Mexico|
|         3|Antonio Moreno Ta...|Antonio Moreno|      Mataderos 2312|México D.F.|      5023| Mexico|
|         4|     Around the Horn|  Thomas Hardy|     120 Hanover Sq.|     London|   WA1 1DP|     UK|
+----------+--------------------+--------------+--------------------+-----------+----------+-------+

Điều kiện nằm trong khoảng

sqlContext.sql(
"""
SELECT * FROM Customers
WHERE CustomerID BETWEEN 1 AND 4
""").show()
df.where("CustomerID BETWEEN 1 AND 4").show()
df.filter(col("CustomerID").between(1, 4)).show()

// Output
+———-+——————–+————–+——————–+———–+———-+——-+
|CustomerID| CustomerName| ContactName| Address| City|PostalCode|Country|
+———-+——————–+————–+——————–+———–+———-+——-+
| 1| Alfreds Futterkiste| Maria Anders| Obere Str. 57| Berlin| 12209|Germany|
| 2|Ana Trujillo Empa…| Ana Trujillo|Avda. de la Const…|México D.F.| 5021| Mexico|
| 3|Antonio Moreno Ta…|Antonio Moreno| Mataderos 2312|México D.F.| 5023| Mexico|
| 4| Around the Horn| Thomas Hardy| 120 Hanover Sq.| London| WA1 1DP| UK|
+———-+——————–+————–+——————–+———–+———-+——-+

Điều kiện giống với regular expression pattern

sqlContext.sql(
"""
SELECT * FROM Customers
WHERE CustomerName LIKE '%ana%'
""").show(5)
df.where("CustomerName LIKE '%ana%'").show()
df.filter(col("CustomerName").like("%ana%")).show()

// Output
+———-+————-+————+—————+————–+———-+——-+
|CustomerID| CustomerName| ContactName| Address| City|PostalCode|Country|
+———-+————-+————+—————+————–+———-+——-+
| 34|Hanari Carnes|Mario Pontes|Rua do Paço, 67|Rio de Janeiro| 05454-876| Brazil|
+———-+————-+————+—————+————–+———-+——-+

Điều kiện nằm trong tập hợp cho trước

sqlContext.sql(
"""
SELECT * FROM Customers
WHERE CustomerID IN (1,3,5)
""").show()
df.where("CustomerID IN (1,3,5)").show()
df.filter(col("CustomerID").isin(List(1, 3, 5):_*)).show()

// Output
+———-+——————–+——————+————–+———–+———-+——-+
|CustomerID| CustomerName| ContactName| Address| City|PostalCode|Country|
+———-+——————–+——————+————–+———–+———-+——-+
| 1| Alfreds Futterkiste| Maria Anders| Obere Str. 57| Berlin| 12209|Germany|
| 3|Antonio Moreno Ta…| Antonio Moreno|Mataderos 2312|México D.F.| 5023| Mexico|
| 5| Berglunds snabbköp|Christina Berglund|Berguvsvägen 8| Luleå| S-958 22| Sweden|
+———-+——————–+——————+————–+———–+———-+——-+

Điều kiện không bằng

sqlContext.sql(
"""
SELECT * FROM Customers
WHERE CustomerID 3
“””).show(5)
df.where(“CustomerID 3”).show(5)
df.filter(col(“CustomerID”) !== 3).show(5)
df.filter(col(“CustomerID”).notEqual(3)).show(5)

// Output
+———-+——————–+——————+——————–+———–+———-+——-+
|CustomerID| CustomerName| ContactName| Address| City|PostalCode|Country|
+———-+——————–+——————+——————–+———–+———-+——-+
| 1| Alfreds Futterkiste| Maria Anders| Obere Str. 57| Berlin| 12209|Germany|
| 2|Ana Trujillo Empa…| Ana Trujillo|Avda. de la Const…|México D.F.| 5021| Mexico|
| 4| Around the Horn| Thomas Hardy| 120 Hanover Sq.| London| WA1 1DP| UK|
| 5| Berglunds snabbköp|Christina Berglund| Berguvsvägen 8| Luleå| S-958 22| Sweden|
| 6|Blauer See Delika…| Hanna Moos| Forsterstr. 57| Mannheim| 68306|Germany|
+———-+——————–+——————+——————–+———–+———-+——-+
only showing top 5 rows

Điều kiện and và or

sqlContext.sql(
  """
    SELECT * FROM Customers
    WHERE Country='Germany'
    AND City='Berlin'
  """).show()
df.where("Country='Germany' AND City='Berlin'").show()
df.filter(col("Country") === "Germany" && col("City") === "Berlin").show()

// Output
+----------+-------------------+------------+-------------+------+----------+-------+
|CustomerID|       CustomerName| ContactName|      Address|  City|PostalCode|Country|
+----------+-------------------+------------+-------------+------+----------+-------+
|         1|Alfreds Futterkiste|Maria Anders|Obere Str. 57|Berlin|     12209|Germany|
+----------+-------------------+------------+-------------+------+----------+-------+

sqlContext.sql(
  """
      SELECT * FROM Customers
      WHERE City='Berlin'
      OR City='München'
  """).show()
df.where("City='Berlin' OR City='München'").show()
df.filter(col("City") === "Berlin" || col("City") === "München").show()

// Output
+----------+-------------------+-------------+-----------------+-------+----------+-------+
|CustomerID|       CustomerName|  ContactName|          Address|   City|PostalCode|Country|
+----------+-------------------+-------------+-----------------+-------+----------+-------+
|         1|Alfreds Futterkiste| Maria Anders|    Obere Str. 57| Berlin|     12209|Germany|
|        25|     Frankenversand|Peter Franken|Berliner Platz 43|München|     80805|Germany|
+----------+-------------------+-------------+-----------------+-------+----------+-------+

ORDER BY Keyword

Cuối cùng là hàm sắp xếp ORDER BY. Ta sẽ sắp tăng dần trên cột dữ liệu Country và giảm dần trên cột dữ liệu CustomerName.

sqlContext.sql(
  """
    SELECT * FROM Customers
    ORDER BY Country ASC, CustomerName DESC
  """).show(5)
df.orderBy(col("Country").asc, col("CustomerName").desc).show(5)

// Output
+----------+--------------------+----------------+--------------------+------------+----------+---------+
|CustomerID|        CustomerName|     ContactName|             Address|        City|PostalCode|  Country|
+----------+--------------------+----------------+--------------------+------------+----------+---------+
|        64|       Rancho grande|Sergio Gutiérrez|Av. del Libertado...|Buenos Aires|      1010|Argentina|
|        54|Océano Atlántico ...|  Yvonne Moncada|Ing. Gustavo Monc...|Buenos Aires|      1010|Argentina|
|        12|Cactus Comidas pa...|Patricio Simpson|         Cerrito 333|Buenos Aires|      1010|Argentina|
|        59|    Piccolo und mehr|     Georg Pipps|         Geislweg 14|    Salzburg|      5020|  Austria|
|        20|        Ernst Handel|   Roland Mendel|        Kirchgasse 6|        Graz|      8010|  Austria|
+----------+--------------------+----------------+--------------------+------------+----------+---------+
only showing top 5 rows

Nguồn tham khảo:

Advertisements

2 thoughts on “Làm việc với Spark DataFrames – Truy vấn cơ bản

  1. Hi anh,
    Anh đánh giá thế nào với các dịch vụ Machine Learning trên Cloud hiện nay như Azure, Google TensorFlow, AWS …. Khía cạnh thứ nhất là so về giá đối với việc setup server riêng, thứ 2 là sự thuận tiện và dễ control hơn so với làm local.
    Thanks a.

    Số lượt thích

    • Hi em,

      Giá cho một server Spark do Databricks cung cấp thấp nhất là 100$/tháng, hệ thống sẵn sàng trong vòng vài phút, hệ thống distribute sẵn, có thể lựa chọn thay đổi cấu hình tuỳ ý. Setup một server riêng giá khởi điểm cũng khoảng 1000-4000$, muốn distribute phải cài máy ảo hoặc mua thêm 2-3 server, cài đặt hệ thống khoảng 1-2 ngày, nâng cấp bị hạn chế về CPU và RAM cũng như nhiều chi phí phát sinh sau này.

      Control về phần mềm ứng dụng thì như nhau. Control về dữ liệu trên local bảo mật hơn trên Cloud. Cloud thuận tiện cho cài đặt và bảo trì. Local thuận tiện tự chủ hệ thống, không phụ thuộc nhiều vào đối tác thứ ba.

      Số lượt thích

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Đăng xuất / Thay đổi )

Connecting to %s