pythonでの並列処理
pythonの処理速度が遅い場合、並列処理が有効である。ここでは、実務でも役立つ並列処理のサンプルコードを紹介する。 pythonの標準ライブラリであるmultiprocessingを利用する。
from multiprocessing import Pool # ジョブを送り込めるワーカープロセスのプールを制御する from multiprocessing import cpu_count # システムの CPU 数を返す
まずは数値実験の対象となる10000x10000の2次元乱数配列を作成する。
import numpy as np num = 10000 np.random.seed(10) x = np.random.uniform(0, 100, (num, num)) x = x.astype(int)
各配列の平均と分散を返す関数を定義する
def get_statistics(x): v_mean = np.mean(x[0:1]) v_std = np.std(x[0:1]) time.sleep(0.001) # 意図的に処理を遅くするためにsleepコマンドを実施 return [v_mean,v_std]
まず、単純に処理する場合
start_time = time.time() dist = [] '''並列処理対象。こちらがベースラインとなる''' for i in range(x.shape[0]): dist.append(get_statistics(x[i:i+1])) '''##########''' end_time = time.time() cal_time = end_time - start_time print('シングルスレッド計算時間:{0} sec.'.format(round(cal_time,3))) # シングルスレッド計算時間:14.466 sec.
次に並列で処理する場合
print('cpu_count:{0}'.format(cpu_count())) # cpu_countにより、システムの CPU数を取得できる. 本実験ではcpu_count=4 for n_jobs in [1,2,4,8,16,32]: start_time = time.time() dist = [] '''並列処理の実行''' dist = Pool(n_jobs).map(get_statistics, (x[i:i+1] for i in range(x.shape[0]))) '''##########''' end_time = time.time() cal_time = end_time - start_time print('{0}スレッド計算時間:{1} sec.'.format(n_jobs,round(cal_time,3))) # 1スレッド計算時間:15.857 sec. # 2スレッド計算時間:8.005 sec. # 4スレッド計算時間:4.275 sec. # 8スレッド計算時間:2.769 sec. # 16スレッド計算時間:2.829 sec. # 32スレッド計算時間:3.078 sec.
シングルスレッドだと14秒程度かかっていた処理が、8スレッドにすることで3秒(約5分の1)程度で実行できた。
最後に、ソースコード全体を記す。
from multiprocessing import Pool from multiprocessing import cpu_count import time import numpy as np # 数値実験の対象となる配列 num = 10000 np.random.seed(10) x = np.random.uniform(0, 100, (num, num)) x = x.astype(int) # 配列の平均と分散を返す関数 def get_statistics(x): v_mean = np.mean(x[0:1]) v_std = np.std(x[0:1]) time.sleep(0.001) # 意図的に処理を遅くするためにsleepコマンドを実施 return [v_mean,v_std] # 単純に処理する場合 start_time = time.time() dist = [] '''並列処理対象。こちらがベースラインとなる''' for i in range(x.shape[0]): dist.append(get_statistics(x[i:i+1])) '''##########''' end_time = time.time() cal_time = end_time - start_time print('シングルスレッド計算時間:{0} sec.'.format(round(cal_time,3))) # シングルスレッド計算時間:14.466 sec. # 並列で処理する場合 print('cpu_count:{0}'.format(cpu_count())) # cpu_countにより、システムの CPU数を取得できる. 本実験ではcpu_count=4 for n_jobs in [1,2,4,8,16,32]: start_time = time.time() dist = [] '''並列処理の実行''' dist = Pool(n_jobs).map(get_statistics, (x[i:i+1] for i in range(x.shape[0]))) '''##########''' end_time = time.time() cal_time = end_time - start_time print('{0}スレッド計算時間:{1} sec.'.format(n_jobs,round(cal_time,3))) # 1スレッド計算時間:15.857 sec. # 2スレッド計算時間:8.005 sec. # 4スレッド計算時間:4.275 sec. # 8スレッド計算時間:2.769 sec. # 16スレッド計算時間:2.829 sec. # 32スレッド計算時間:3.078 sec.