Hướng dẫn phân tích dữ liệu IoT với Apache Spark trên OCI Data Flow và Autonomous Database


Summary

Bài viết này khám phá cách phân tích dữ liệu IoT một cách hiệu quả thông qua việc sử dụng Apache Spark trên nền tảng OCI Data Flow và Autonomous Database. Nó mang đến cho độc giả những giá trị thiết thực về khả năng tối ưu hóa chi phí cũng như triển khai các giải pháp hiện đại trong lĩnh vực này. Key Points:

  • Tối ưu hóa chi phí với OCI Data Flow nhờ vào mô hình không máy chủ, giúp tiết kiệm đáng kể cho các tác vụ phân tích dữ liệu IoT quy mô lớn.
  • Tích hợp liền mạch giữa OCI Data Flow, Autonomous Database và Object Storage là chìa khóa để tối ưu hóa hiệu suất truyền tải dữ liệu trong phân tích IoT.
  • Ứng dụng kỹ thuật xử lý dữ liệu thời gian thực bằng Structured Streaming của Spark sẽ giúp quản lý trạng thái và xử lý lỗi hiệu quả hơn trong môi trường IoT.
Chìa khóa thành công trong việc phân tích dữ liệu IoT nằm ở khả năng khai thác triệt để sức mạnh của OCI Data Flow kết hợp với các dịch vụ khác của Oracle.

Thiết lập môi trường phát triển với OCI Data Science

# Apache Spark với OCI Data Flow và Oracle Autonomous Database

## Bạn muốn biết hóa đơn điện của mình sẽ như thế nào vào tháng tới? Đây là Phần 2.

> Dự kiến sẽ có 5 phần để bao quát toàn bộ kịch bản:
> [Phần 1: Node-RED với OCI Streaming và Oracle Autonomous Database]
> [Phần 2: Apache Spark với OCI Data Flow và Oracle Autonomous Database]
> [Phần 3: Nhận dữ liệu thời gian thực với Spark, OCI Data Flow và OCI Data Lake]
> [Phần 4: Dự đoán tiêu thụ năng lượng với OCI Data Flow và Spark MLlib]
> [Phần 5: Phân tích dữ liệu với AutonomousDB và OCI Analytics Cloud]

Trong Phần 1, chúng tôi đã thu thập dữ liệu IoT bằng cách sử dụng Node-RED và chuyển giao nó đến Oracle Autonomous Database cùng với OCI Streaming.

Tạo metastore để quản lý metadata bảng

Trong **Phần 2**, chúng ta sẽ tìm hiểu cách mà OCI Data Science Notebook có thể được sử dụng để thiết lập một cụm Apache Spark với OCI Data Flow và thu thập dữ liệu từ Cơ sở Dữ liệu Tự động và Bộ nhớ Đối tượng OCI. ### **Bước 1: Cấu hình môi trường phát triển**OCI Data Flow là một nền tảng dữ liệu hoàn toàn không máy chủ, dễ bảo trì cho việc lưu trữ các tác vụ Apache Spark. Nền tảng này hỗ trợ ứng dụng bằng Python, Scala hoặc Java trong các chế độ theo lô, streaming hoặc tương tác. Nó giúp bạn thiết lập các Hồ dữ liệu hiệu quả và mở rộng chỉ trong vài phút. Để đơn giản hóa quá trình thiết lập môi trường phát triển, chúng ta sẽ tận dụng Jupyter Notebook của **OCI Data Science**.
Extended Perspectives Comparison:
BướcMô tả
1Đăng nhập vào máy ảo với Docker qua SSH.
2Tạo thư mục và thêm phụ thuộc cần thiết vào tệp `packages.txt`.
3Tạo tệp `requirement.txt` và tải xuống ví tiền ADB.
4Tạo và kết nối với cụm Spark OCI Data Flow.
5Kết nối với AutonomousDB qua OCI Data Flow.
6Phân tích dữ liệu bằng Spark OCI Data Flow.
7Lưu trữ dữ liệu thô vào các tệp Parquet trong OCI Object Storage.

Tạo metastore để quản lý metadata bảng

Chuẩn bị cho việc tạo cụm Spark trên OCI Data Flow

1. Đăng nhập vào Console OCI. 2. Truy cập vào **Phân tích & AI > Dịch vụ Dữ liệu OCI > Dự án** và tạo một phiên làm việc notebook (1 OPCU là đủ). Trong phiên làm việc notebook của bạn, hãy sử dụng menu Tệp > Mới > **Trình khám phá Môi trường** để cài đặt conda **PySpark 3.5 và Data Flow**. Bạn cũng có thể sử dụng Trình khám phá Notebook để làm quen với các tính năng của Dịch vụ Dữ liệu OCI: Có một notebook hữu ích, hướng dẫn cách sử dụng OCI Data Flow với OCI Data Science. Tôi rất khuyên bạn nên xem qua nó:

Tạo tệp phụ thuộc với thư viện bổ sung

Để quản lý metadata cho các bảng, bạn cần thiết lập một metastore. Việc này có thể được thực hiện dễ dàng thông qua OCI Data Catalog. Dưới đây là các bước để tạo metastore:

1. Tạo hai bucket trong OCI Object Storage - một cái dành cho các bảng được quản lý và cái còn lại cho các bảng bên ngoài.
2. Truy cập vào mục Analytics & AI > Data Catalog > Metastore và tiến hành tạo một metastore mới, nhớ cung cấp thông tin về các bucket đã tạo ở bước 1.

3. Sau khi hoàn tất việc tạo metastore, hãy sao chép **metastore OCID** vì nó sẽ được sử dụng trong bước tiếp theo.

Tạo tệp phụ thuộc với thư viện bổ sung

Tạo và kết nối với cụm Spark trên OCI Data Flow

### **Bước 3: Chuẩn bị để tạo cụm Spark trên OCI Data Flow**
Cuối cùng, hãy viết một số mã! Chúng ta cần một cụm Spark để chạy khối lượng công việc. Đầu tiên, thiết lập một số biến:
import ads
import json
import os
import pandas as pd

def prepare_command(command: dict) -> str:
"""Chuyển đổi từ điển lệnh thành chuỗi các lệnh định dạng."""
return f"'{json.dumps(command)}'"

ads.set_auth("resource_principal")
compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID")
namespace = 'tên miền không gian lưu trữ đối tượng'
logs_bucket_uri = 'oci://dataflow_app@' + namespace + '/dataflow_logs'
metastore_id = "ocid1.metastore"
%reload_ext dataflow.magics

**Lưu ý:** Để sử dụng `ads.set_auth("resource_principal")`, bạn cần đảm bảo đã tạo một **Nhóm Động** cho phiên làm việc của notebook OCI Data Science và OCI Data Flow, cùng với các chính sách cần thiết (_[tài liệu]_) . Bây giờ, chúng ta hãy chuẩn bị cụm Spark:
command = prepare_command({
"compartmentId": compartment_id,
"displayName": "StreamingApp",
"language": "PYTHON",
"sparkVersion": "3.5.0",
"numExecutors": 1,
"driverShape": "VM.Standard.E5.Flex",
"executorShape": "VM.Standard.E5.Flex",
"driverShapeConfig": {"ocpus": 1, "memoryInGBs": 16},
"executorShapeConfig": {"ocpus": 1, "memoryInGBs": 16},
"metastoreId": metastore_id,
"type": "SESSION",
"logsBucketUri": logs_bucket_uri,
"archiveUri":"oci://dataflow_app@namespace/archive.zip",
 "configuration":{
 "fs.oci.client.hostname":"https://objectstorage.eu-frankfurt-1.oraclecloud.com",
 "spark.oracle.datasource.enabled":"true",
 "spark.oracle.deltalake.version":"3.1.0",
 "dataflow.auth":"resource_principal"
 }
})


### **Bước 4: Tạo tệp phụ thuộc với các thư viện bổ sung**
Để xử lý dữ liệu từ Kafka và thực hiện dự đoán bằng XGBoost, chúng ta cần chuẩn bị một bộ các thư viện và trình điều khiển cho Spark. Điều này được gọi là _[tệp phụ thuộc (liên kết tài liệu)]_. Bạn có thể tạo tệp phụ thuộc này bằng cách sử dụng Cloud Shell của OCI hoặc một phiên bản máy tính OCI tùy chỉnh có hệ điều hành Linux và Docker được cài đặt.

Trong quá trình này, Apache Spark sẽ hoạt động như một nền tảng phân tích mạnh mẽ trong môi trường điện toán đám mây nhờ vào khả năng mở rộng linh hoạt của nó. Với OCI Data Flow, bạn có thể tối ưu hóa quy trình xử lý dữ liệu thông qua việc cấu hình kích thước cụm cũng như loại tài nguyên (CPU/GPU) mà bạn muốn sử dụng cho ứng dụng của mình.

Các tham số như `numExecutors`, `driverShape`, và `executorShape` rất quan trọng để đảm bảo rằng ứng dụng của bạn hoạt động hiệu quả nhất có thể trong môi trường đám mây này. Ví dụ thực tế về những cấu hình này sẽ giúp người đọc dễ hiểu hơn về cách thức kết nối và triển khai ứng dụng trên nền tảng Oracle Cloud Infrastructure.

Kết nối đến Autonomous Database từ OCI Data Flow

1. Đăng nhập vào máy ảo của bạn với Docker đã cài đặt thông qua SSH. 2. Chuẩn bị một thư mục để chứa các tệp:
mkdir df chmod 777 df cd df
3. Thêm các phụ thuộc cần thiết vào tệp `packages.txt`: `org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2` (liên kết đến tài liệu) **Mẹo hay:** Bạn cũng có thể sử dụng tùy chọn cấu hình `spark.jars.packages` cho các phụ thuộc. _[Dưới đây là một ví dụ: Cách sử dụng OCI Data Flow với AWS S3] hoặc [Các kịch bản khác nhau của Data Flow] bởi [Phantompete](Không có)._ 4. Tạo tệp **requirement.txt** với các gói Python: ![Bảng điều khiển VM] Kiểm tra _[tài liệu],_ chọn lệnh phù hợp với nền tảng của bạn: hình dạng ARM A1 hoặc AMD64 E1/E4/E5) và chạy (dưới đây hoạt động cho AMD64):
docker pull phx.ocir.io/axmemlgtri2a/dataflow/dependency-packager-linux_x86_64:latest docker run --platform linux/amd64 --rm -v $(pwd):/opt/dataflow --pull always -it phx.ocir.io/axmemlgtri2a/dataflow/dependency-packager-linux_x86_64:latest -p 3.11

Kết nối đến Autonomous Database từ OCI Data Flow

Phân tích dữ liệu sử dụng Spark trong OCI Data Flow

Bạn cần tạo một tệp **archive.zip**. Tải nó về và sau đó tải lên bucket OCI Object Storage. Cung cấp **archiveUri** chính xác trong lệnh cấu hình cụm của bạn: "archiveUri": "oci://dataflow_app@namespace/**archive.zip**".

### **Bước 5:** Tạo và kết nối với cụm Spark OCI Data Flow
Với kho lưu trữ phụ thuộc đã được chuẩn bị, giờ đây chúng ta có thể tạo một cụm Spark bằng lệnh _%create_session_:
import time
start_time = time.time()
%create_session -l python -c $command
print(f"Thời gian bắt đầu cụm: {time.time() - start_time}")

Lưu trữ dữ liệu thô vào Parquet trên Object Storage

Tạo một cụm máy chủ có thể mất từ 3 đến 5 phút. Sau đó, bạn có thể kết nối với cụm đang chạy bằng lệnh SparkMagic _%use_session_ (sử dụng OCID ứng dụng DataFlow):
%use_session -f -s ocid1.dataflowapplication.oc1.eu-frankfurt-1.antheljreicj2tia32za6hwbtzwzcqof6spdthdww6zndxrxmkevzpdy3lzq
![Sử dụng phiên OCI Data Flow hiện có] Bạn cũng có thể kiểm tra số lượng luồng Spark mà bạn đang sử dụng:
%%spark print(spark.sparkContext.defaultParallelism) 16
Bạn có thể tắt cụm bằng lệnh _%stop_session()_.

### **Bước 6: Kết nối với AutonomousDB qua OCI Data Flow**
OCI Data Flow giúp đơn giản hóa việc kết nối với các **trình điều khiển đã được xây dựng sẵn** cho Delta và Oracle DB. Trong cấu hình của chúng tôi, chúng tôi đã thêm những dòng sau:
"spark.oracle.datasource.enabled":"true","spark.oracle.deltalake.version" : "3.1.0",
Bạn có thể xem khả năng của _[Oracle Spark Data Source]_, các tùy chọn JDBC của Spark cũng áp dụng trong trường hợp này.

Đầu tiên, hãy tải xuống ví tiền ADB và tải nó lên bucket Object Storage của OCI. Ngoài ra, bạn cũng có thể sử dụng OCID ADB thay cho ví tiền nếu ADB nằm trong cùng một tenancy; điều này sẽ tiện lợi hơn, trong khi ví tiền thường cần thiết đối với các thiết lập xuyên tenancy khác.:
%%spark adw_wallet = "oci://dataflow_app@<namespace>/Wallet_sylwek.zip" adw_password = "Oracle_password" adw_user = "Oracle_username" ds = spark.read.format("oracle") \ .option("walletUri",adw_wallet) \ .option("dbtable", "schema.supla") \ .option("user", adw_user) \ .option("connectionId", "adb_high") \ .option("password", adw_password) \ .option("partitionColumn", "time_event") \ .option("lowerBound", "2022–11–01 01:00:00.000") \ .option("upperBound", "2022–12–31 23:59:59.000") \ .option("oracle.jdbc.mapDateToTimestamp", "false") \ .option("sessionInitStatement", "ALTER SESSION SET NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'") \ .option("numPartitions", 16) \ .load().cache() 

Lưu trữ dữ liệu thô vào Parquet trên Object Storage

Truy xuất và trực quan hóa dữ liệu tại chỗ

Với tùy chọn **sessionInitStatement**, bạn có thể thực hiện một lệnh SQL hoặc thủ tục lưu trữ **trước** truy vấn chính của mình, chẳng hạn như để thiết lập định dạng timestamp (cho mệnh đề Where/Lọc hoặc PartitionColumn trên cột timestamp):
.option("sessionInitStatement", "ALTER SESSION SET NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'")\ .load()\ .filter("time_event <= '2022–12–01'")\
Spark sẽ áp dụng W[here] trong cơ sở dữ liệu - có một tùy chọn mặc định là _**pushDownPredicate = True**_. Bạn có thể đọc thêm _tại đây_. Kế hoạch thực thi của Spark trông như sau:
(3) Scan JDBCRelation(solar.supla) [numPartitions=16]  [codegen id : 1]Output [4]: [TIME_EVENT#9298, SENSOR#9299, VALUE#9300, PHASE#9301]PushedFilters: [*IsNotNull(TIME_EVENT), *LessThanOrEqual(TIME_EVENT,2022-12-01 00:00:00.0)]ReadSchema: struct<TIME_EVENT:timestamp,SENSOR:string,VALUE:decimal(12,8),PHASE:decimal(38,10)>
Bạn có thể kiểm tra truy vấn trong Autonomous Performance Hub (OCI Console): cả các điều kiện Lọc và Phân vùng đều đã được áp dụng. ### **Bước 7: Phân tích dữ liệu bằng Spark OCI Data Flow** Dữ liệu thô từ đồng hồ năng lượng thường không hữu ích cho đến khi được xử lý. Ví dụ, để tính kWh dựa trên các khoảng thời gian và tiêu thụ năng lượng, hãy sử dụng hàm phân tích của Spark _lag_, như sau:
%%sparkfrom pyspark.sql import Windowfrom pyspark.sql import functions as Fw = Window.partitionBy("Phase").orderBy("TIME_EVENT") detail = ds\.withColumn('year',F.year(F.col("TIME_EVENT"))) \.withColumn('month',F.date_format(F.col("TIME_EVENT"), 'yyyy-MM')) \.withColumn('day',F.date_format(F.col("TIME_EVENT"), 'yyyy-MM-dd')) \.withColumn('hour',F.date_format(F.col("TIME_EVENT"), 'yyyy-MM-dd HH')) \.withColumn("PrevTime", F.lag("TIME_EVENT", 1).over(w))\.withColumn("TimeDiffSec",F.expr('EXTRACT(SECOND FROM TIME_EVENT-PrevTime)').cast('double'))\.withColumn("kWh", F.col("TimeDiffSec")/3600*(F.col("VALUE")/1000))\.orderBy("TIME_EVENT")aggkwh = detail.groupBy('year','month','day','hour')\            .agg(\                F.sum("kWh").alias("aggkwh")\                ).orderBy("hour").cache()aggkwh.printSchema()aggkwh.show()

Điều gì tiếp theo sau khi thiết lập là gì?

Sau khi tạo tập dữ liệu tổng hợp trong phiên Spark trên cụm, chúng ta có thể chuyển nó sang phiên Python cục bộ. Để thực hiện điều này, trước tiên hãy tạo một view:

%%spark
ds.createOrReplaceTempView("aggkwh")


Trong ô tiếp theo, một đối tượng `aggkwh` sẽ được tạo tại địa phương thông qua truy vấn SQL:

%%spark -c sql -o aggkwh
SELECT * FROM aggday LIMIT 20000;


Giờ đây, chúng ta có thể trực quan hóa dữ liệu ở chế độ cục bộ (lưu ý rằng không cần **%%spark**):

from autovizwidget.widget.utils import display_dataframe
display_dataframe(aggkwh)


### **Bước 8:** Tạo **Data Lake trên OCI Object Storage**

Nhiệm vụ cuối cùng là lưu trữ dữ liệu thô ("tập dữ liệu chi tiết", đã được tạo ở bước trước) vào các tệp Parquet (dĩ nhiên, bạn cũng có thể sử dụng _[Delta]_ hoặc _[Iceberg]_), với phân vùng trên OCI Object Storage. Chúng ta cần xác định một đường dẫn cho dữ liệu: **bucket@namespace/folder** (Hãy chắc chắn rằng bạn đã tạo sẵn một bucket OCI Object Storage cho dữ liệu của mình):

%%spark
output_path = 'oci://dataflow_app@' + namespace + '/supla_details'
detail.write\
.partitionBy("year","month")\
.format("parquet")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.save(output_path)


Và để đọc nó từ đó:

%%spark
output_path = 'oci://dataflow_app@' + namespace + '/supla_details'
detail = spark.read.parquet(output_path).cache()
detail.show(10, truncate=False)


### Kế hoạch tiếp theo?

Trong Phần 2, chúng ta đã khám phá cách thiết lập Spark với OCI Data Flow, trích xuất dữ liệu từ cơ sở dữ liệu và lưu nó vào Object Storage. Trong Phần 3, chúng ta sẽ đi sâu vào Spark Streaming để tạo ra một **Data Lake Thời Gian Thực**, sử dụng định dạng Delta. Hãy chờ đón nhé!

Reference Articles

923 kB

... và 97480077 ·that 92166405 ·was 90362617 ·của 89666243 ·for 88075000 ... với 51947495 ·một 51796898 ·người 45768285 ·he 45322094 ·you 45064032 ...

Source: Hugging Face

- Kiến Thức Cho Người lao Động Việt Nam

Do what ? Eat what? Go where? Learn what ? Listen to what? Play what ? Shop where? ... Do what ? Eat what? Go where? Learn what ? Listen to what? Play what ? Shop ...


喜連川 優 (Masaru Kitsuregawa)

Expert

Related Discussions

❖ Related Articles