前言
此為 multiprocessing 的程式模板v2,(相比 v2 簡化為一支程式)
將任務封裝好後,直接丟入 list_tasks 即可自動發揮系統最大效能跑平行運算。
封裝的概念,請務必先自行理解。
我們通常在設計程式時,都會是預設以單一 process 去執行程式,
但有時候會要重複執行多種任務。
例:跑影片 x 100、分析結果 x 100、雜項任務 x 100…
會需要寫程式的任務,大部分都是需要被反覆執行的,我們正好利用這個特性
multiprocessing.Pool() 會依據系統現在的能力,自動分配任務,
也就是說會讓你的電腦在“接近極限卻不當掉的”情況下,
發揮最大多工的能力。
範例程式碼
- start_multiprocess.py
請在這邊將所有封裝好的任務存入 list_tasks,(用 list_tasks.append())
例:list_tasks.append(CLASS_NAME(your_args, …))
封裝的任務必須至少包含以下兩個函數:
- init(): 初始化變數
- start_progress(): 讓自動化流程分配任務時,開始執行的入口
注意:請勿將 start_progress() 放在 init() 中執行,這樣在宣告階段程式就會先開始跑起來了! 失去了自動分配資源的效果!
import os
import cv2
import glob
import multiprocessing as mp
from multiprocessing import RLock
from tqdm import tqdm
from termcolor import colored
list_tasks = []
# ------------- [YOU ONLY NEED TO SET HERE] ------------- #
# * append your task in to list_tasks
# -> list_tasks.append(CLASS_NAME(your_args, ...))
#
# * your CLASS must have at least two functions:
# -> __init__() and start_progress()
#---------------------------------------------------------#
# put your task here
#---------------------------------------------------------#
#---------------------------------------------------------#
#---------------------------------------------------------#
#---------------------------------------------------------#
#---------------------------------------------------------#
class howard_print(object):
@staticmethod
def info(str):
print(colored(f"[Info] ", 'green') + str)
@staticmethod
def warn(str):
print(colored(f"[Warning] ", 'yellow') + str)
@staticmethod
def error(str):
print(colored(f"[Error] ", 'red') + str)
@staticmethod
def finish(str):
print(colored(f"[Finished] ", 'cyan') + str)
@staticmethod
def undefined(str):
print(colored(f"[Undefined] ", 'magenta') + str)
class multiprocess_task(object):
def __init__(self, idx, total_job, each_task):
os.system('cls' if os.name == 'nt' else 'clear')
howard_print.info(f'Current working on job {idx}/{len(list_tasks)}.')
each_task.start_progress(total_job)
howard_print.info(f"job {idx}/{len(list_tasks)} finished !!!")
if __name__ == '__main__':
tqdm.set_lock(RLock()) # for managing output contention
cpu_count = mp.cpu_count()
print(f" --------------------- [start_multiprocess.py] --------------------- ")
howard_print.info(f"Your CPU count (worker count): {cpu_count}")
pool = mp.Pool()
res_list = []
howard_print.info(f"Total task count = {len(list_tasks)}")
howard_print.info(f"Task list = {list_tasks}")
for idx in range(len(list_tasks)):
res_list.append(pool.apply_async(multiprocess_task, (idx, len(list_tasks), list_tasks[idx])))
for idx in range(len(res_list)):
res_list[idx].get()