note.nkmk.me

Python, Joblibでシンプルな並列処理(joblib.Parallel)

Posted: 2021-01-10 / Tags: Python, ファイル処理, 画像処理

PythonのライブラリJoblibを使うと、シンプルな並列処理を簡単に書ける。

ここでは以下の内容について説明する。

  • Joblibのインストール
  • joblib.Parallelによるシンプルな並列処理
    • 基本的な使い方
    • ログの出力レベル(冗長性): 引数verbose
    • 最大同時実行ジョブ数: 引数n_jobs
  • 複数の値を返す関数の場合
  • 複数の引数を受け取る関数の場合
  • 値を返さない(Noneを返す)関数の場合
  • 並列処理による速度改善
  • 具体例1. 画像ファイルのリサイズを並列処理
  • 具体例2. テキストファイルの一行目抽出を並列処理

本記事では、それぞれが独立したタスクの並列処理を対象とする。一つのタスクを分割するような処理や、相互に依存する複雑なタスクの処理は扱わない。

スポンサーリンク

Joblibのインストール

Joblibはpip(環境によってはpip3)でインストールできる。

$ pip install joblib

本記事ではJoblib1.0.0を使っている。バージョンが異なると仕様が異なる場合があるので注意。

import joblib

print(joblib.__version__)
# 1.0.0

joblib.Parallelによるシンプルな並列処理

並列処理にはJoblibのParallelクラスとdelayed()関数を使う。

基本的な使い方

以下は、引数に指定された値をそのまま返すだけの関数funcに、0 ~ 4(= range(5))の値を渡す処理。

関数の返り値がリストに格納されて返される。

def func(i):
    return i

result = joblib.Parallel(n_jobs=-1)(joblib.delayed(func)(i) for i in range(5))
print(result)
# [0, 1, 2, 3, 4]

Parallelクラスとdelayed()関数は以下のように使う。

joblib.Parallel(<Parallelへの引数>)(
    joblib.delayed(<実行する関数>)(<関数への引数>) for 変数名 in イテラブル
)

joblib.delayed()() for 変数名 in イテラブルの部分はジェネレーター式(リスト内包表記のジェネレーター版)。

複雑な例や具体的な例は後述する。

以下、Parallel()の引数について簡単に紹介する。バックエンドを選択するための引数preferなどについては公式ドキュメントを参照。

ログの出力レベル(冗長性): 引数verbose

Parallel()の引数verboseに、ログの出力レベルを整数値で指定する。

上の例のように、デフォルトでは何も出力されない。値を大きくすると出力レベルが上がる(冗長性が増す)。10より大きいとすべてのログが出力され、50以上だとstdout(標準出力)に出力される。

result = joblib.Parallel(n_jobs=-1, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
# [Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.0s finished

result = joblib.Parallel(n_jobs=-1, verbose=11)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
# [Parallel(n_jobs=-1)]: Done   1 tasks      | elapsed:    0.0s
# [Parallel(n_jobs=-1)]: Batch computation too fast (0.0055s.) Setting batch_size=2.
# [Parallel(n_jobs=-1)]: Done   2 out of   5 | elapsed:    0.0s remaining:    0.0s
# [Parallel(n_jobs=-1)]: Done   3 out of   5 | elapsed:    0.0s remaining:    0.0s
# [Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.0s remaining:    0.0s
# [Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.0s finished

最大同時実行ジョブ数: 引数n_jobs

Parallel()の引数n_jobsに、同時に実行されるジョブ数の最大値を整数値で指定する。

-1とするとすべてのCPUが使用される。ログの一行目の... with 4 concurrent workers.から分かるように、サンプルコードを実行した環境では4

result = joblib.Parallel(n_jobs=-1, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-1)]: Using backend LokyBackend with 4 concurrent workers.
# [Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:    0.0s finished

他の値を指定した例。

result = joblib.Parallel(n_jobs=2, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
# [Parallel(n_jobs=2)]: Done   5 out of   5 | elapsed:    0.3s finished

result = joblib.Parallel(n_jobs=-2, verbose=1)(joblib.delayed(func)(i) for i in range(5))
# [Parallel(n_jobs=-2)]: Using backend LokyBackend with 3 concurrent workers.
# [Parallel(n_jobs=-2)]: Done   5 out of   5 | elapsed:    0.4s finished

-2以下の場合、すべてのCPU数 + 1 + n_jobsとなる。例の場合、-1のときが4なので、-23-32となる。1つだけは空けておきたいというような場合に便利。

複数の値を返す関数の場合

複数の値を返す関数を指定すると、以下のように返り値を要素とするタプルが結果のリストに格納される。

def func_multi(i):
    return i, i**2, i**3

results = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_multi)(i) for i in range(5))
print(results)
# [(0, 0, 0), (1, 1, 1), (2, 4, 8), (3, 9, 27), (4, 16, 64)]

以下のようにzip()*を利用して分解することも可能。

a, b, c = zip(*results)
print(a)
# (0, 1, 2, 3, 4)

print(b)
# (0, 1, 4, 9, 16)

print(c)
# (0, 1, 8, 27, 64)

複数の引数を受け取る関数の場合

複数の引数を受け取る関数を指定するには、delayed()を含むジェネレーター式においてzip()を使えばよい。

def func_multi2(i, j, k):
    return i, j, k

results = joblib.Parallel(n_jobs=-1)(
    joblib.delayed(func_multi2)(i, j, k) for i, j, k in zip(range(5), range(5, 10), range(10, 15))
)
print(results)
# [(0, 5, 10), (1, 6, 11), (2, 7, 12), (3, 8, 13), (4, 9, 14)]

値を返さない(Noneを返す)関数の場合

何らかの処理を行うだけで値を返さない(Noneを返す)関数の場合、Noneを要素とするリストが返される。

def func_none():
    # do something
    pass

results = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_none)() for i in range(5))
print(results)
# [None, None, None, None, None]

Jupyter Notebookなどで必要のない結果が表示されてしまうのを防ぎたい場合は、適当な変数に代入しておけばよい。

joblib.Parallel(n_jobs=-1)(joblib.delayed(func_none)() for i in range(5))
# [None, None, None, None, None]

_ = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_none)() for i in range(5))

便宜上代入する変数には、上の例のようにアンダースコア_が使われることがある。文法的に意味があるわけではなく慣用的なもので、アンパックなどでも使われる。

並列処理による速度改善

joblib.Parallelを使わない場合(通常処理)と使う場合(並列処理)で処理時間を計測し比較する。

ここではJupyter Notebookのマジックコマンド%%timeitを使っている。以下のサンプルコードはPythonコードとして実行しても動かないので注意。

joblib.Parallelの実行にはオーバーヘッドが生じるため、もともと一瞬で終わるような処理はそのまま実行したほうが速い。

def func(i):
    return i

%%timeit
for i in range(100):
    func(i)
# 9.87 µs ± 251 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%%timeit
joblib.Parallel(n_jobs=-1)(joblib.delayed(func)(i) for i in range(100))
# 21.6 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

time.sleep()で0.1sec(= 100msec)処理を停止する関数を例とする。

import time

def func_sleep(i):
    time.sleep(0.1)
    return i

%%timeit
for i in range(8):
    func_sleep(i)
# 824 ms ± 5.63 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
joblib.Parallel(n_jobs=-1)(joblib.delayed(func_sleep)(i) for i in range(8))
# 221 ms ± 5.06 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

例の環境は同時実行ジョブ数が4なので、通常処理に比べて、並列処理だと処理時間がおよそ1/4に短縮されているのが分かる。

当然ながら、これは理想的な場合であり、常にこの割合で改善されるというわけではない。並列処理にメリットがあるかどうかは、処理内容や繰り返し回数、環境(マシンの性能)などに依存する。

参考までに、次に説明する具体例の場合、MacBook Pro (13-inch, 2016, Four Thunderbolt 3 Ports)で以下のような結果であった。

  • 画像ファイル100個のリサイズ
    • 通常処理に比べて、並列処理は処理時間がおよそ1/2
  • テキストファイル10000個の一行目抽出
    • 通常処理に比べて、並列処理は処理時間がおよそ1/2

具体例1. 画像ファイルのリサイズを並列処理

フォルダ内の画像ファイルの一括リサイズをJoblibで並列処理する。

基本的なコードの説明は以下の記事を参照。

通常の処理。..../src_imgフォルダに画像ファイルが格納されているとする。

import os
import glob
from PIL import Image
import joblib

dst_dir = 'data/temp/joblib/dst_img'
os.makedirs(dst_dir, exist_ok=True)

files = glob.glob('data/temp/joblib/src_img/*')

for f in files:
    try:
        img = Image.open(f)
        img_resize = img.resize((img.width // 2, img.height // 2))
        root, ext = os.path.splitext(f)
        basename = os.path.basename(root)
        img_resize.save(os.path.join(dst_dir, basename + '_half' + ext))
    except OSError as e:
        pass

並列処理。

def func(f):
    try:
        img = Image.open(f)
        img_resize = img.resize((img.width // 2, img.height // 2))
        root, ext = os.path.splitext(f)
        basename = os.path.basename(root)
        img_resize.save(os.path.join(dst_dir, basename + '_half' + ext))
    except OSError as e:
        pass

_ = joblib.Parallel(n_jobs=-1)(joblib.delayed(func)(f) for f in files)

具体例2. テキストファイルの一行目抽出を並列処理

フォルダ内のテキストファイルの一行目を抽出する例。

前準備として指定フォルダにテキストファイルを生成する。

import os
import pathlib
import pprint
import joblib

src_dir = 'data/temp/joblib/src_file'

os.makedirs(src_dir, exist_ok=True)

for i in range(10):
    pathlib.Path(src_dir).joinpath(f'file{i:05}.txt').write_text(f'This is file{i:05}')

この処理も以下のように並列化できる。

def func_write(i):
    pathlib.Path(src_dir).joinpath(f'file{i:05}.txt').write_text(f'This is file{i:05}')

_ = joblib.Parallel(n_jobs=-1)(joblib.delayed(func_write)(i) for i in range(10))

テキストファイル一行目抽出の通常処理。ファイル名と一行目の文字列をリストに格納し、その結果をソートしてpprintで出力している。

first_lines = []
for p in pathlib.Path(src_dir).glob('*.txt'):
    with p.open() as f:
        first_lines.append((p.name, f.readline()))

pprint.pprint(sorted(first_lines))
# [('file00000.txt', 'This is file00000'),
#  ('file00001.txt', 'This is file00001'),
#  ('file00002.txt', 'This is file00002'),
#  ('file00003.txt', 'This is file00003'),
#  ('file00004.txt', 'This is file00004'),
#  ('file00005.txt', 'This is file00005'),
#  ('file00006.txt', 'This is file00006'),
#  ('file00007.txt', 'This is file00007'),
#  ('file00008.txt', 'This is file00008'),
#  ('file00009.txt', 'This is file00009')]

並列処理。

def func_read_first_line(p):
    with p.open() as f:
        return p.name, f.readline()

first_lines = joblib.Parallel(n_jobs=-1)(
    joblib.delayed(func_read_first_line)(p) for p in pathlib.Path(src_dir).glob('*.txt')
)

pprint.pprint(sorted(first_lines))
# [('file00000.txt', 'This is file00000'),
#  ('file00001.txt', 'This is file00001'),
#  ('file00002.txt', 'This is file00002'),
#  ('file00003.txt', 'This is file00003'),
#  ('file00004.txt', 'This is file00004'),
#  ('file00005.txt', 'This is file00005'),
#  ('file00006.txt', 'This is file00006'),
#  ('file00007.txt', 'This is file00007'),
#  ('file00008.txt', 'This is file00008'),
#  ('file00009.txt', 'This is file00009')]
スポンサーリンク
シェア
このエントリーをはてなブックマークに追加

関連カテゴリー

関連記事