Python, Joblibでシンプルな並列処理(joblib.Parallel)
PythonのライブラリJoblibを使うと、シンプルな並列処理を簡単に書ける。
- Joblib: running Python functions as pipeline jobs — joblib 1.1.0.dev0 documentation
- joblib/joblib: Computing with Python functions.
ここでは以下の内容について説明する。
- Joblibのインストール
joblib.Parallel
によるシンプルな並列処理- 基本的な使い方
- ログの出力レベル(冗長性): 引数
verbose
- 最大同時実行ジョブ数: 引数
n_jobs
- 複数の値を返す関数の場合
- 複数の引数を受け取る関数の場合
- 値を返さない(
None
を返す)関数の場合 - 並列処理による速度改善
- 具体例1. 画像ファイルのリサイズを並列処理
- 具体例2. テキストファイルの一行目抽出を並列処理
本記事では、それぞれが独立したタスクの並列処理を対象とする。一つのタスクを分割するような処理や、相互に依存する複雑なタスクの処理は扱わない。
Joblibのインストール
Joblibはpip
(環境によってはpip3
)でインストールできる。
$ pip install joblib
本記事ではJoblib1.0.0を使っている。バージョンが異なると仕様が異なる場合があるので注意。
import joblib
print(joblib.__version__)
# 1.0.0
joblib.Parallelによるシンプルな並列処理
並列処理にはJoblibのParallel
クラスとdelayed()
関数を使う。
基本的な使い方
以下は、引数に指定された値をそのまま返すだけの関数func
に、0 ~ 4
(= range(5)
)の値を渡す処理。
関数の返り値がリストに格納されて返される。
def func(i):
return i
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(func)(i) for i in range(5))
print(result)
# [0, 1, 2, 3, 4]
Parallel
クラスとdelayed()
関数は以下のように使う。
joblib.Parallel(<Parallelへの引数>)(
joblib.delayed(<実行する関数>)(<関数への引数>) for 変数名 in イテラブル
)
joblib.delayed()() for 変数名 in イテラブル
の部分はジェネレーター式(リスト内包表記のジェネレーター版)。
- 関連記事: Pythonリスト内包表記の使い方
複雑な例や具体的な例は後述する。
以下、Parallel()
の引数について簡単に紹介する。バックエンドを選択するための引数prefer
などについては公式ドキュメントを参照。
- joblib.Parallel — joblib 1.1.0.dev0 documentation
- Embarrassingly parallel for loops — joblib 1.1.0.dev0 documentation
ログの出力レベル(冗長性): 引数verbose
Parallel()
の引数verbose
に、ログの出力レベルを整数値で指定する。
上の例のように、デフォルトでは何も出力されない。値を大きくすると出力レベルが上がる(冗長性が増す)。10
より大きいとすべてのログが出力され、50
以上だとstdout
(標準出力)に出力される。
result = joblib.Parallel(n_jobs=-1, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
# [Parallel(n_jobs=-1)]: Done 5 out of 5 | elapsed: 0.0s finished
result = joblib.Parallel(n_jobs=-1, verbose=11)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
# [Parallel(n_jobs=-1)]: Done 1 tasks | elapsed: 0.0s
# [Parallel(n_jobs=-1)]: Batch computation too fast (0.0055s.) Setting batch_size=2.
# [Parallel(n_jobs=-1)]: Done 2 out of 5 | elapsed: 0.0s remaining: 0.0s
# [Parallel(n_jobs=-1)]: Done 3 out of 5 | elapsed: 0.0s remaining: 0.0s
# [Parallel(n_jobs=-1)]: Done 5 out of 5 | elapsed: 0.0s remaining: 0.0s
# [Parallel(n_jobs=-1)]: Done 5 out of 5 | elapsed: 0.0s finished
最大同時実行ジョブ数: 引数n_jobs
Parallel()
の引数n_jobs
に、同時に実行されるジョブ数の最大値を整数値で指定する。
-1
とするとすべてのCPUが使用される。ログの一行目の... with 4 concurrent workers.
から分かるように、サンプルコードを実行した環境では4
。
result = joblib.Parallel(n_jobs=-1, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
# [Parallel(n_jobs=-1)]: Done 5 out of 5 | elapsed: 0.0s finished
他の値を指定した例。
result = joblib.Parallel(n_jobs=2, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
# [Parallel(n_jobs=2)]: Done 5 out of 5 | elapsed: 0.3s finished
result = joblib.Parallel(n_jobs=-2, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-2)]: Using backend LokyBackend with 3 concurrent workers.
# [Parallel(n_jobs=-2)]: Done 5 out of 5 | elapsed: 0.4s finished
-2
以下の場合、すべてのCPU数 + 1 + n_jobs
となる。例の場合、-1
のときが4
なので、-2
で3
、-3
で2
となる。1つだけは空けておきたいというような場合に便利。
複数の値を返す関数の場合
複数の値を返す関数を指定すると、以下のように返り値を要素とするタプルが結果のリストに格納される。
- 関連記事: Pythonの関数で複数の戻り値を返す方法
def func_multi(i):
return i, i**2, i**3
results = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_multi)(i) for i in range(5))
print(results)
# [(0, 0, 0), (1, 1, 1), (2, 4, 8), (3, 9, 27), (4, 16, 64)]
以下のようにzip()
と*
を利用して分解することも可能。
a, b, c = zip(*results)
print(a)
# (0, 1, 2, 3, 4)
print(b)
# (0, 1, 4, 9, 16)
print(c)
# (0, 1, 8, 27, 64)
複数の引数を受け取る関数の場合
複数の引数を受け取る関数を指定するには、delayed()
を含むジェネレーター式においてzip()
を使えばよい。
def func_multi2(i, j, k):
return i, j, k
results = joblib.Parallel(n_jobs=-1)(
joblib.delayed(func_multi2)(i, j, k) for i, j, k in zip(range(5), range(5, 10), range(10, 15))
)
print(results)
# [(0, 5, 10), (1, 6, 11), (2, 7, 12), (3, 8, 13), (4, 9, 14)]
値を返さない(Noneを返す)関数の場合
何らかの処理を行うだけで値を返さない(None
を返す)関数の場合、None
を要素とするリストが返される。
def func_none():
# do something
pass
results = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_none)() for i in range(5))
print(results)
# [None, None, None, None, None]
Jupyter Notebookなどで必要のない結果が表示されてしまうのを防ぎたい場合は、適当な変数に代入しておけばよい。
joblib.Parallel(n_jobs=-1)(joblib.delayed(func_none)() for i in range(5))
# [None, None, None, None, None]
_ = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_none)() for i in range(5))
便宜上代入する変数には、上の例のようにアンダースコア_
が使われることがある。文法的に意味があるわけではなく慣用的なもので、アンパックなどでも使われる。
並列処理による速度改善
joblib.Parallel
を使わない場合(通常処理)と使う場合(並列処理)で処理時間を計測し比較する。
ここではJupyter Notebookのマジックコマンド%%timeit
を使っている。以下のサンプルコードはPythonコードとして実行しても動かないので注意。
joblib.Parallel
の実行にはオーバーヘッドが生じるため、もともと一瞬で終わるような処理はそのまま実行したほうが速い。
def func(i):
return i
%%timeit
for i in range(100):
func(i)
# 9.87 µs ± 251 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%%timeit
joblib.Parallel(n_jobs=-1)(joblib.delayed(func)(i) for i in range(100))
# 21.6 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
time.sleep()
で0.1sec(= 100msec)処理を停止する関数を例とする。
import time
def func_sleep(i):
time.sleep(0.1)
return i
%%timeit
for i in range(8):
func_sleep(i)
# 824 ms ± 5.63 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
joblib.Parallel(n_jobs=-1)(joblib.delayed(func_sleep)(i) for i in range(8))
# 221 ms ± 5.06 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
例の環境は同時実行ジョブ数が4
なので、通常処理に比べて、並列処理だと処理時間がおよそ1/4に短縮されているのが分かる。
当然ながら、これは理想的な場合であり、常にこの割合で改善されるというわけではない。並列処理にメリットがあるかどうかは、処理内容や繰り返し回数、環境(マシンの性能)などに依存する。
参考までに、次に説明する具体例の場合、MacBook Pro (13-inch, 2016, Four Thunderbolt 3 Ports)で以下のような結果であった。
- 画像ファイル100個のリサイズ
- 通常処理に比べて、並列処理は処理時間がおよそ1/2
- テキストファイル10000個の一行目抽出
- 通常処理に比べて、並列処理は処理時間がおよそ1/2
具体例1. 画像ファイルのリサイズを並列処理
フォルダ内の画像ファイルの一括リサイズをJoblibで並列処理する。
基本的なコードの説明は以下の記事を参照。
通常の処理。..../src_img
フォルダに画像ファイルが格納されているとする。
import os
import glob
from PIL import Image
import joblib
dst_dir = 'data/temp/joblib/dst_img'
os.makedirs(dst_dir, exist_ok=True)
files = glob.glob('data/temp/joblib/src_img/*')
for f in files:
try:
img = Image.open(f)
img_resize = img.resize((img.width // 2, img.height // 2))
root, ext = os.path.splitext(f)
basename = os.path.basename(root)
img_resize.save(os.path.join(dst_dir, basename + '_half' + ext))
except OSError as e:
pass
並列処理。
def func(f):
try:
img = Image.open(f)
img_resize = img.resize((img.width // 2, img.height // 2))
root, ext = os.path.splitext(f)
basename = os.path.basename(root)
img_resize.save(os.path.join(dst_dir, basename + '_half' + ext))
except OSError as e:
pass
_ = joblib.Parallel(n_jobs=-1)(joblib.delayed(func)(f) for f in files)
具体例2. テキストファイルの一行目抽出を並列処理
フォルダ内のテキストファイルの一行目を抽出する例。
前準備として指定フォルダにテキストファイルを生成する。
import os
import pathlib
import pprint
import joblib
src_dir = 'data/temp/joblib/src_file'
os.makedirs(src_dir, exist_ok=True)
for i in range(10):
pathlib.Path(src_dir).joinpath(f'file{i:05}.txt').write_text(f'This is file{i:05}')
この処理も以下のように並列化できる。
def func_write(i):
pathlib.Path(src_dir).joinpath(f'file{i:05}.txt').write_text(f'This is file{i:05}')
_ = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_write)(i) for i in range(10))
テキストファイル一行目抽出の通常処理。ファイル名と一行目の文字列をリストに格納し、その結果をソートしてpprint
で出力している。
first_lines = []
for p in pathlib.Path(src_dir).glob('*.txt'):
with p.open() as f:
first_lines.append((p.name, f.readline()))
pprint.pprint(sorted(first_lines))
# [('file00000.txt', 'This is file00000'),
# ('file00001.txt', 'This is file00001'),
# ('file00002.txt', 'This is file00002'),
# ('file00003.txt', 'This is file00003'),
# ('file00004.txt', 'This is file00004'),
# ('file00005.txt', 'This is file00005'),
# ('file00006.txt', 'This is file00006'),
# ('file00007.txt', 'This is file00007'),
# ('file00008.txt', 'This is file00008'),
# ('file00009.txt', 'This is file00009')]
並列処理。
def func_read_first_line(p):
with p.open() as f:
return p.name, f.readline()
first_lines = joblib.Parallel(n_jobs=-1)(
joblib.delayed(func_read_first_line)(p) for p in pathlib.Path(src_dir).glob('*.txt')
)
pprint.pprint(sorted(first_lines))
# [('file00000.txt', 'This is file00000'),
# ('file00001.txt', 'This is file00001'),
# ('file00002.txt', 'This is file00002'),
# ('file00003.txt', 'This is file00003'),
# ('file00004.txt', 'This is file00004'),
# ('file00005.txt', 'This is file00005'),
# ('file00006.txt', 'This is file00006'),
# ('file00007.txt', 'This is file00007'),
# ('file00008.txt', 'This is file00008'),
# ('file00009.txt', 'This is file00009')]