map †
>>> def add(x,y):
return x+y
>>> m = map(add, [1,2,3], [4,5,6])
>>> m
<map object at 0x10da84bd0>
>>> next(m)
5
>>> next(m)
7
>>> next(m)
9
>>> next(m)
StopIteration
- map(関数, 引数1のiteratable, 引数2のiteratable, ...)
- 返値は iteratable (map object)
- 関数に、引数を適用した結果の iteratable が返る
zip †
- n 個の iteratable をまとめる。それぞれの 1 つ目の組、それぞれの 2 つ目の組、それぞれの 3 つ目の組、・・・
>>> z =zip([1,2,3],['a','b','c'])
>>> next(z)
(1, 'a')
>>> next(z)
(2, 'b')
>>> next(z)
(3, 'c')
>>> next(z)
StopIteration
filter †
>>> f = filter( lambda t : t > 0, [-1, 3, 9, 23, -296,123] )
>>> f
<filter object at 0x10da972d0>
>>> next(f)
3
>>> next(f)
9
>>> next(f)
23
>>> next(f)
123
>>> next(f)
StopIteration
- filter(関数, iteratable)
- 返値は iteratable (filter object)
- 関数に、引数を適用した結果が True となる引数の iteratable が返る
reduce †
>>> import functools
>>> def add(x,y):
print(str(x) + '+' + str(y))
return x + y
>>> functools.reduce(add, [1,20,300,400])
1+20
21+300
321+400
721
- reduce(関数, iteratable)
- 引数を2つ持つ関数に、iteratable を累積的に適用する
シンプルな並列処理 †
- これが本命。map() は、単に集合理論をプログラム言語に落とし込んだだけの衒学的なものではない。
- Thread Pool に対して、map が使える。
- めちゃ簡単に並列処理を記述できる。普通に map() を書くだけで複雑な並列処理が実現できる。
- multi.py
#!/usr/bin/env python
# coding: utf-8
from multiprocessing import Pool
import datetime
def factorial(x):
res = 1
for i in range(1,x+1):
res *= i
return str(x) + "! = " + str(res)
def calcSerial(size):
return [factorial(p) for p in range(1,size)]
def calcParallel(size):
p = Pool(4)
return p.map(factorial, range(1,size))
def main():
start = datetime.datetime.now()
calcSerial(5000)
end = datetime.datetime.now()
print("SERIAL:\t\t" + str(end-start))
start = datetime.datetime.now()
calcParallel(5000)
end = datetime.datetime.now()
print("PARALLEL:\t" + str(end-start))
if __name__ == '__main__':
main()
1〜5000 について、それぞれ階乗をもとめる
- 実行結果
$ ./multi.py
SERIAL: 0:00:34.295200
PARALLEL: 0:00:19.166773
ちゃんと、並列で動いて、実行時間は約半分。
うちのポンコツは Core が 2 個しかないので、Thread Pool を 4 にしても、実行時間は半分にしかならんのね
$ sysctl -n machdep.cpu.brand_string
Intel(R) Core(TM)2 Duo CPU P8600 @ 2.40GHz
$ system_profiler | egrep "Processor|Core"
Processor Name: Intel Core 2 Duo
Processor Speed: 2.4 GHz
Number of Processors: 1
Total Number of Cores: 2
Pool.map() で 例外が起きたらどうなる †
- map_err.py
#!/usr/bin/env python
# coding: utf-8
from multiprocessing import Pool
import datetime
def aho(val) :
if val % 3 == 0 :
raise ValueError(str(val) + 'はアホ')
return val
print('***** SERIAL *****')
print('CALL map()')
res = map(aho,[1,2,3,4,5])
print('PRINT RESULT')
while True :
try :
print( next(res) )
except ValueError as ve :
print( ve )
except StopIteration :
break
p = Pool(4)
print('***** PARALLELL *****')
print('CALL Pool.map()')
res = p.map(aho,[1,2,3,4,5])
print('PRINT RESULT')
while True :
try :
print( next(res) )
except ValueError as ve :
print( ve )
except StopIteration :
break
- 実行結果
$ ./map_err.py
***** SERIAL *****
CALL map()
PRINT RESULT
1
2
3はアホ
4
5
***** PARALLELL *****
CALL Pool.map()
Traceback (most recent call last):
File "./map_err.py", line 29, in <module>
res = p.map(aho,[1,2,3,4,5])
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.2/lib/python3.2/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.2/lib/python3.2/multiprocessing/pool.py", line 556, in get
raise self._value
ValueError: 3はアホ
- SERIAL 実行のときには、next() で関数が実行される。→ 例外が発生しても続きを実行できる
- PARALLEL 実行のときには、map() で関数が実行される。どれかで例外が発生すると全部止まる
Thread Pool †
- Pool.map(func,iteratable)
- Pool.map_async(func, iteratable, callback)
- 終了を待たない map
- 終了すると callback が呼ばれる
- Pool.imap(func, iterable)
- next() の引数にタイムアウト秒数を設定できる
- 指定秒数を超えて終わらないと multiprocessing.TimeoutError? が発生する
- Pool.imap_unordered(func, iterable)
- next() で返ってくる処理結果の順番が順不同な imap
- Pool.close()
- Pool.terminate()
- Pool.join()
- 現在処理中のタスクの終了を待つ。close() / terminate() を呼んだ後で呼ぶ
タイムアウト付きの imap 実行サンプル †
- imap_example.py
#!/usr/bin/env python
# coding: utf-8
from multiprocessing import Pool
from multiprocessing import TimeoutError
import time
import datetime
def complex_func(t) :
time.sleep(t)
return t
p = Pool(2)
resIt = p.imap(complex_func,[1,5,15,30])
while True :
start = datetime.datetime.now()
try :
print('Start to wait result')
res = resIt.next(10)
print('Get result \"' + str(res) + '\".\n Wait time :' + str(datetime.datetime.now() - start))
except TimeoutError as tErr:
print('Timeout.\n Wait time :' + str(datetime.datetime.now() - start))
print(tErr)
except StopIteration :
break
- 実行結果
$ ./imap_example.py
Start to wait result
Get result "1".
Wait time :0:00:01.002186
Start to wait result
Get result "5".
Wait time :0:00:04.002223
Start to wait result
Timeout.
Wait time :0:00:10.005681
Start to wait result
Get result "15".
Wait time :0:00:01.000145
Start to wait result
Timeout.
Wait time :0:00:10.005794
Start to wait result
Get result "30".
Wait time :0:00:09.003719
Start to wait result
- res.next(10) は、処理開始から 10 秒では無い。next(10)で処理終了を待ち始めてからの待ち時間
- TimeoutError? が起きても '処理が中断されるわけでは無い'。処理終了待ちをやめるだけ。実際に実行結果は次のようになっている
経過時間(sec) | 処理待ち時間(sec) | イベント |
0 | | 処理開始 |
1 | (1) | 1の処理結果を受信 |
5 | (4) | 5の処理結果を受信 |
15 | (10) | 15の処理待ち→タイムアウト |
16 | (1) | 15の処理結果を受信 |
26 | (10) | 30の処理待ち→タイムアウト |
35 | (9) | 30の処理結果を受信 |
Python