数理コンサルタントの備忘録

あなたの悩みを数理で解決する

pythonでの並列処理

pythonの処理速度が遅い場合、並列処理が有効である。ここでは、実務でも役立つ並列処理のサンプルコードを紹介する。 pythonの標準ライブラリであるmultiprocessingを利用する。

from multiprocessing import Pool # ジョブを送り込めるワーカープロセスのプールを制御する
from multiprocessing import cpu_count # システムの CPU 数を返す

まずは数値実験の対象となる10000x10000の2次元乱数配列を作成する。

import numpy as np
num = 10000
np.random.seed(10)
x = np.random.uniform(0, 100, (num, num))
x = x.astype(int)

各配列の平均と分散を返す関数を定義する

def get_statistics(x):
    v_mean = np.mean(x[0:1])
    v_std = np.std(x[0:1])
    time.sleep(0.001) # 意図的に処理を遅くするためにsleepコマンドを実施
    return [v_mean,v_std]

まず、単純に処理する場合

start_time = time.time()
dist = []
'''並列処理対象。こちらがベースラインとなる'''
for i in range(x.shape[0]):
    dist.append(get_statistics(x[i:i+1]))
'''##########'''
end_time = time.time()
cal_time = end_time - start_time
print('シングルスレッド計算時間:{0} sec.'.format(round(cal_time,3)))
# シングルスレッド計算時間:14.466 sec.

次に並列で処理する場合

print('cpu_count:{0}'.format(cpu_count())) # cpu_countにより、システムの CPU数を取得できる. 本実験ではcpu_count=4
for n_jobs in [1,2,4,8,16,32]:
    start_time = time.time()
    dist = []
    '''並列処理の実行'''
    dist = Pool(n_jobs).map(get_statistics,
        (x[i:i+1] for i in range(x.shape[0])))
    '''##########'''
    end_time = time.time()
    cal_time = end_time - start_time
    print('{0}スレッド計算時間:{1} sec.'.format(n_jobs,round(cal_time,3)))
# 1スレッド計算時間:15.857 sec.
# 2スレッド計算時間:8.005 sec.
# 4スレッド計算時間:4.275 sec.
# 8スレッド計算時間:2.769 sec.
# 16スレッド計算時間:2.829 sec.
# 32スレッド計算時間:3.078 sec.

シングルスレッドだと14秒程度かかっていた処理が、8スレッドにすることで3秒(約5分の1)程度で実行できた。

最後に、ソースコード全体を記す。

from multiprocessing import Pool
from multiprocessing import cpu_count
import time
import numpy as np

# 数値実験の対象となる配列
num = 10000
np.random.seed(10)
x = np.random.uniform(0, 100, (num, num))
x = x.astype(int)

# 配列の平均と分散を返す関数
def get_statistics(x):
    v_mean = np.mean(x[0:1])
    v_std = np.std(x[0:1])
    time.sleep(0.001) # 意図的に処理を遅くするためにsleepコマンドを実施
    return [v_mean,v_std]

# 単純に処理する場合
start_time = time.time()
dist = []
'''並列処理対象。こちらがベースラインとなる'''
for i in range(x.shape[0]):
    dist.append(get_statistics(x[i:i+1]))
'''##########'''
end_time = time.time()
cal_time = end_time - start_time
print('シングルスレッド計算時間:{0} sec.'.format(round(cal_time,3)))
# シングルスレッド計算時間:14.466 sec.

# 並列で処理する場合
print('cpu_count:{0}'.format(cpu_count())) # cpu_countにより、システムの CPU数を取得できる. 本実験ではcpu_count=4
for n_jobs in [1,2,4,8,16,32]:
    start_time = time.time()
    dist = []
    '''並列処理の実行'''
    dist = Pool(n_jobs).map(get_statistics,
        (x[i:i+1] for i in range(x.shape[0])))
    '''##########'''
    end_time = time.time()
    cal_time = end_time - start_time
    print('{0}スレッド計算時間:{1} sec.'.format(n_jobs,round(cal_time,3)))
# 1スレッド計算時間:15.857 sec.
# 2スレッド計算時間:8.005 sec.
# 4スレッド計算時間:4.275 sec.
# 8スレッド計算時間:2.769 sec.
# 16スレッド計算時間:2.829 sec.
# 32スレッド計算時間:3.078 sec.