一、CPU密集型任務開發指導
CPU密集型任務是指需要占用系統資源處理大量計算能力的任務,需要長時間運行,這段時間會阻塞線程其它事件的處理,不適宜放在主線程進行。例如圖像處理、視頻編碼、數據分析等。
基于多線程并發機制處理CPU密集型任務可以提高CPU利用率,提升應用程序響應速度。
當進行一系列同步任務時,推薦使用Worker;而進行大量或調度點較為分散的獨立任務時,不方便使用8個Worker去做負載管理,推薦采用TaskPool。接下來將以圖像直方圖處理以及后臺長時間的模型預測任務分別進行舉例。
使用TaskPool進行圖像直方圖處理
? 1. 實現圖像處理的業務邏輯。
? 2. 數據分段,將各段數據通過不同任務的執行完成圖像處理。
創建Task,通過execute()執行任務,在當前任務結束后,會將直方圖處理結果同時返回。
? 3. 結果數組匯總處理。
import taskpool from '@ohos.taskpool'; @Concurrent function imageProcessing(dataSlice: ArrayBuffer) { // 步驟1: 具體的圖像處理操作及其他耗時操作 return dataSlice; } function histogramStatistic(pixelBuffer: ArrayBuffer) { // 步驟2: 分成三段并發調度 let number = pixelBuffer.byteLength / 3; let buffer1 = pixelBuffer.slice(0, number); let buffer2 = pixelBuffer.slice(number, number * 2); let buffer3 = pixelBuffer.slice(number * 2); let task1 = new taskpool.Task(imageProcessing, buffer1); let task2 = new taskpool.Task(imageProcessing, buffer2); let task3 = new taskpool.Task(imageProcessing, buffer3); taskpool.execute(task1).then((ret: ArrayBuffer[]) => { // 步驟3: 結果處理 }); taskpool.execute(task2).then((ret: ArrayBuffer[]) => { // 步驟3: 結果處理 }); taskpool.execute(task3).then((ret: ArrayBuffer[]) => { // 步驟3: 結果處理 }); } @Entry @Component struct Index { @State message: string = 'Hello World' build() { Row() { Column() { Text(this.message) .fontSize(50) .fontWeight(FontWeight.Bold) .onClick(() => { let data: ArrayBuffer; histogramStatistic(data); }) } .width('100%') } .height('100%') } }
使用Worker進行長時間數據分析
本文通過某地區提供的房價數據訓練一個簡易的房價預測模型,該模型支持通過輸入房屋面積和房間數量去預測該區域的房價,模型需要長時間運行,房價預測需要使用前面的模型運行結果,因此需要使用Worker。
? 1. DevEco Studio提供了Worker創建的模板,新建一個Worker線程,例如命名為“MyWorker”。
? 2. 在主線程中通過調用ThreadWorker的constructor()方法創建Worker對象,當前線程為宿主線程。
import worker from '@ohos.worker'; const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
? 3. 在宿主線程中通過調用onmessage()方法接收Worker線程發送過來的消息,并通過調用postMessage()方法向Worker線程發送消息。
例如向Worker線程發送訓練和預測的消息,同時接收Worker線程發送回來的消息。
// 接收Worker子線程的結果 workerInstance.onmessage = function(e) { // data:主線程發送的信息 let data = e.data; console.info('MyWorker.ts onmessage'); // 在Worker線程中進行耗時操作 } workerInstance.onerror = function (d) { // 接收Worker子線程的錯誤信息 } // 向Worker子線程發送訓練消息 workerInstance.postMessage({ 'type': 0 }); // 向Worker子線程發送預測消息 workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
? 4. 在MyWorker.ts文件中綁定Worker對象,當前線程為Worker線程。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker'; let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
? 5. 在Worker線程中通過調用onmessage()方法接收宿主線程發送的消息內容,并通過調用postMessage()方法向宿主線程發送消息。
如在Worker線程中定義預測模型及其訓練過程,同時與主線程進行信息交互。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker'; let workerPort: ThreadWorkerGlobalScope = worker.workerPort; // 定義訓練模型及結果 let result; // 定義預測函數 function predict(x) { return result[x]; } // 定義優化器訓練過程 function optimize() { result = {}; } // Worker線程的onmessage邏輯 workerPort.onmessage = function (e: MessageEvents) { let data = e.data // 根據傳輸的數據的type選擇進行操作 switch (data.type) { case 0: // 進行訓練 optimize(); // 訓練之后發送主線程訓練成功的消息 workerPort.postMessage({ type: 'message', value: 'train success.' }); break; case 1: // 執行預測 const output = predict(data.value); // 發送主線程預測的結果 workerPort.postMessage({ type: 'predict', value: output }); break; default: workerPort.postMessage({ type: 'message', value: 'send message is invalid' }); break; } }
? 6. 在Worker線程中完成任務之后,執行Worker線程銷毀操作。銷毀線程的方式主要有兩種:根據需要可以在宿主線程中對Worker線程進行銷毀;也可以在Worker線程中主動銷毀Worker線程。
在宿主線程中通過調用onexit()方法定義Worker線程銷毀后的處理邏輯。
// Worker線程銷毀后,執行onexit回調方法 workerInstance.onexit = function() { console.info("main thread terminate"); }
方式一:在宿主線程中通過調用terminate()方法銷毀Worker線程,并終止Worker接收消息。
// 銷毀Worker線程 workerInstance.terminate();
方式二:在Worker線程中通過調用close()方法主動銷毀Worker線程,并終止Worker接收消息。
// 銷毀線程 workerPort.close();
二、 I/O密集型任務開發指導
使用異步并發可以解決單次I/O任務阻塞的問題,但是如果遇到I/O密集型任務,同樣會阻塞線程中其它任務的執行,這時需要使用多線程并發能力來進行解決。
I/O密集型任務的性能重點通常不在于CPU的處理能力,而在于I/O操作的速度和效率。這種任務通常需要頻繁地進行磁盤讀寫、網絡通信等操作。此處以頻繁讀寫系統文件來模擬I/O密集型并發任務的處理。
? 1. 定義并發函數,內部密集調用I/O能力。
import fs from '@ohos.file.fs'; // 定義并發函數,內部密集調用I/O能力 @Concurrent async function concurrentTest(fileList: string[]) { // 寫入文件的實現 async function write(data, filePath) { let file = await fs.open(filePath, fs.OpenMode.READ_WRITE); await fs.write(file.fd, data); fs.close(file); } // 循環寫文件操作 for (let i = 0; i < fileList.length; i++) { write('Hello World!', fileList[i]).then(() =?> { console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`); }).catch((err) => { console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`) return false; }) } return true; }
? 2. 使用TaskPool執行包含密集I/O的并發函數:通過調用execute()方法執行任務,并在回調中進行調度結果處理。示例中的filePath1和filePath2的獲取方式請參見獲取應用文件路徑。
import taskpool from '@ohos.taskpool'; let filePath1 = ...; // 應用文件路徑 let filePath2 = ...; // 使用TaskPool執行包含密集I/O的并發函數 // 數組較大時,I/O密集型任務任務分發也會搶占主線程,需要使用多線程能力 taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => { // 調度結果處理 console.info(`The result: ${ret}`); })
審核編輯 黃宇
-
cpu
+關注
關注
68文章
10904瀏覽量
213023 -
圖像處理
+關注
關注
27文章
1300瀏覽量
56894 -
線程
+關注
關注
0文章
505瀏覽量
19758 -
HarmonyOS
+關注
關注
79文章
1982瀏覽量
30579
發布評論請先 登錄
相關推薦
評論