gevent は libev を元にした並行ライブラリです。 ネットワークや並行プログラミングのためのクリーンなAPIを提供しています。
このチュートリアルはある程度の Python の知識を前提としていますが、 それ以上の知識は前提としていません。 並列プログラミングの知識も必要ありません。 このチュートリアルの目的は、 gevent を扱う道具を提供し、 読者がすでに持っている一般的な並列プログラミングの問題を手なづけて 非同期プログラムを書き始められるように手助けすることです。
時系列順の寄稿者: Stephen Diehl Jérémy Bethmont sww Bruno Bigras David Ripton Travis Cline Boris Feld youngsterxyf Eddie Hebert Alexis Metaireau Daniel Velkov
そして Denis Bilenko に、 gevent の開発とこのチュートリアルを作る上での 指導について感謝します。
この共同作業によるドキュメントは MIT ライセンスにて公開されています。 何か追記したいことがあったり、タイプミスを発見した場合は、 Github で fork して pull request を送ってください。その他のどんな貢献も歓迎します。
時系列順の翻訳者: methane
翻訳は gevent-tutorial-ja で行なっています。 翻訳に対する修正依頼等はこちらにお願いします.
gevent で使われている一番重要なパターンが Greenlet という Python のC拡張の形で提供された軽量なコルーチンです。 すべての Greenlet はメインプログラムのOSプロセス内で実行されますが、 協調的にスケジューリングされます.
Only one greenlet is ever running at any given time. (どの時点をとっても常にひとつの greenlet だけが動いている)
これは、 multiprocessing
や threading
が提供している、
OS がスケジューリングして本当に並列に動くプロセスや POSIX スレッドを利用した
並列機構とは異なるものです。
並行プログラムのコアとなる考え方は、大きいタスクは小さいサブタスクに分割して、 それらを1つずつ、あるいは 同期的に 動かす代わりに、同時に、あるいは 非同期に 動かすことです。 2つのサブタスク間の切り替えは コンテキストスイッチ と呼ばれます。
gevent のコンテキストスイッチは 譲渡(yield) によって行われます。
次の例では、2つのコンテキストが gevent.sleep(0)
を実行することにより
お互いに譲渡しています。
import gevent
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
次の制御フローを視覚化した画像を見るか、このサンプルプログラムを デバッガーでステップ実行してコンテキストスイッチが起こる様子を確認してください。
gevent が本当の力を発揮するのは、ネットワークやIOバウンドの、協調的に スケジュールできる関数を実行するために利用する場合です。 gevent はすべての詳細部分の面倒を見て、可能な限りネットワークライブラリが 暗黙的に greenlet コンテキストを譲渡するようにします。 私はこれがどれだけ強力な方法なのかをうまく強調することができません。 でも、次の例が示してくれると思います。
この例では、 select()
関数は通常ブロックする関数で、複数の
ファイルディスクリプタをポーリングします。
import time
import gevent
from gevent import select
start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)
def gr1():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: ', tic())
select.select([], [], [], 2)
print('Ended Polling: ', tic())
def gr2():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: ', tic())
select.select([], [], [], 2)
print('Ended Polling: ', tic())
def gr3():
print("Hey lets do some stuff while the greenlets poll, at", tic())
gevent.sleep(1)
gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])
Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds
もう一つのすこし人工的な例として、 非決定論的な (同じ入力に対して
出力が同じになるとは限らない) task
関数を定義しています。
この例では、 task
関数を実行すると副作用としてランダムな秒数だけ
タスクの実行を停止します。
import gevent
import random
def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(random.randint(0,2))
print('Task', pid, 'done')
def synchronous():
for i in range(1,10):
task(i)
def asynchronous():
threads = [gevent.spawn(task, i) for i in xrange(10)]
gevent.joinall(threads)
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 2 done
Task 5 done
Task 6 done
Task 7 done
Task 9 done
Task 4 done
Task 3 done
Task 0 done
Task 8 done
Task 1 done
同期的に実行した場合、全てのタスクはシーケンシャルに実行され、 メインプログラムは各タスクを実行している間 ブロックする (メインプログラムの実行が停止される) ことになります。
このプログラムの重要な部分は、与えられた関数を greenlet スレッドの中に
ラップする gevent.spawn
です。
生成された greenlet のリストが threads
配列に格納され、
gevent.joinall
関数に渡されます。この関数は今のプログラムを、与えられた
すべての greenlet を実行するためにブロックします。
すべての greenlet が終了した時に次のステップが実行されます。
注意すべき重要な点として、非同期に実行した場合の実行順序は本質的に ランダムであり、トータルの実行時間は同期的に実行した場合よりもずっと 短くなります。実際、各タスクが2秒ずつ停止すると、同期的に実行した場合は すべてのタスクを実行するのに20秒かかりますが、非同期に実行した場合は 各タスクが他のタスクの実行を止めないのでだいたい2秒で終了します。
もっと一般的なユースケースとして、サーバーからデータを非同期に取得する場合、
fetch()
の実行時間はリモートサーバーの負荷などによってリクエストごとに
異なります。
import gevent.monkey
gevent.monkey.patch_socket()
import gevent
import urllib2
import simplejson as json
def fetch(pid):
response = urllib2.urlopen('http://json-time.appspot.com/time.json')
result = response.read()
json_result = json.loads(result)
datetime = json_result['datetime']
print 'Process ', pid, datetime
return json_result['datetime']
def synchronous():
for i in range(1,10):
fetch(i)
def asynchronous():
threads = []
for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)
print 'Synchronous:'
synchronous()
print 'Asynchronous:'
asynchronous()
先に触れたように、 greenlet は決定論的に動作します。 同じように設定した greenlets があって、同じ入力が与えられた場合、 必ず同じ結果になります。例として、タスクを multiprocessing の pool に 与えた場合と gevent pool に与えた場合を比較します。
import time
def echo(i):
time.sleep(0.001)
return i
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print( run1 == run2 == run3 == run4 )
# Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print( run1 == run2 == run3 == run4 )
False
True
gevent が通常決定論的だといっても、ソケットやファイルなどの外部の サービスとのやりとりを始めると非決定論的な入力がプログラムに入り込んできます。 green スレッドが "決定論的な並行" であっても、 POSIX スレッドやプロセスを 使った場合に起きる問題の一部が起こります。
並行プログラミングにずっと付きまとう問題が レースコンディション(race condition) です。 簡単に言うと、2つの並行するスレッドやプロセスが幾つかの共有リソース (訳注:グローバル変数など)に依存しており、しかもそれを変更しようとする問題です。 結果としてそのリソースの状態は実行順序に依存してしまいます。 一般的にプログラマーはこの問題を可能な限り避けようとするべきです。 そうしないとプログラムのふるまいが全体として非決定性になってしまうからです。
レースコンディションを避ける一番の方法は、常に、全てのグローバルな状態を避ける事です。 グローバルな状態や import 時の副作用はいつでもあなたに噛みついてきます。 (訳注: import 時の副作用は、 import 順序によってプログラムの動作を変えてしまう、 Python でプログラミングをすると忘れた頃にハマる問題のタネです. 並行プログラミングとは 無関係です)
gevent は greenlet の初期化のラッパーを幾つか提供しています。 幾つかのよくあるパターンは:
import gevent
from gevent import Greenlet
def foo(message, n):
"""
Each thread will be passed the message, and n arguments
in its initialization.
"""
gevent.sleep(n)
print(message)
# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)
# Wrapper for creating and runing a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)
# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)
threads = [thread1, thread2, thread3]
# Block until all threads complete.
gevent.joinall(threads)
Hello
I live!
Greenlet クラスを直接使うだけでなく、 Greenlet のサブクラスを作って
_run
メソッドをオーバーライドすることもできます。
import gevent
from gevent import Greenlet
class MyGreenlet(Greenlet):
def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n
def _run(self):
print(self.message)
gevent.sleep(self.n)
g = MyGreenlet("Hi there!", 3)
g.start()
g.join()
Hi there!
他のどんなコードでもあるように、 greenlet もいろいろな失敗をすることがあります。 greenlet は例外を投げそこねるかもしれませんし、停止できなくなったり、 システムリソースを食い過ぎるかもしれません。
greenlet の内部状態は基本的に時間とともに変化するパラメーターになります。 greenlet の状態をモニターするための幾つかのフラグがあります。
started
-- bool値: greenlet が開始しているかどうか.ready()
-- bool値, greenlet が停止(halt)しているかどうか.successful()
-- bool値, greenlet が例外を投げずに終了したかどうか.value
-- 任意の値, greenlet が返した値.exception
-- 例外, greenlet 内で投げられた例外.
import gevent
def win():
return 'You win!'
def fail():
raise Exception('You fail at failing.')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started) # True
print(loser.started) # True
# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
gevent.joinall([winner, loser])
except Exception as e:
print('This will never be reached')
print(winner.value) # 'You win!'
print(loser.value) # None
print(winner.ready()) # True
print(loser.ready()) # True
print(winner.successful()) # True
print(loser.successful()) # False
# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.
print(loser.exception)
# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()
True
True
You win!
None
True
True
True
False
You fail at failing.
メインプログラムが SIGQUIT を受信した時に yield しない greenlet がプログラムを実行させ続ける可能性があります。 この状態は "ゾンビプロセス" と呼ばれ、 Python インタープリターの 外から kill してやる必要があります。
一般的なパターンはメインプログラムが SIGQUIT イベントを受信して、
exit する前に gevent.shutdown
を呼び出すことです。
import gevent
import signal
def run_forever():
gevent.sleep(1000)
if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.shutdown)
thread = gevent.spawn(run_forever)
thread.join()
タイムアウトとは、コードブロックや greenlet の実行時間に対する制約です。
import gevent
from gevent import Timeout
seconds = 10
timeout = Timeout(seconds)
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print 'Could not complete'
もしくはコンテキストマネージャーとして with
文を使います。
import gevent
from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(10)
加えて、 gevent は多くの greenlet やデータ構造に関する関数に timeout 引数を提供しています。例えば:
import gevent
from gevent import Timeout
def wait():
gevent.sleep(2)
timer = Timeout(1).start()
thread1 = gevent.spawn(wait)
try:
thread1.join(timeout=timer)
except Timeout:
print('Thread 1 timed out')
# --
timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)
try:
thread2.get(timeout=timer)
except Timeout:
print('Thread 2 timed out')
# --
try:
gevent.with_timeout(1, wait)
except Timeout:
print('Thread 3 timed out')
Thread 1 timed out
Thread 2 timed out
Thread 3 timed out
Gevent の暗黒面に触れていきます。
強力なコルーチンのパターンを推奨するためにここまでモンキーパッチに
ついて触れるのを避けて来ましたが、そろそろこの黒魔術について話す時です。
いままでのコードに monkey.patch_socket()
というコマンドが出てきたのに
気づきましたか?これは標準ライブラリの socket ライブラリを改変するコマンドです。
import socket
print( socket.socket )
print "After monkey patch"
from gevent import monkey
monkey.patch_socket()
print( socket.socket )
import select
print select.select
monkey.patch_select()
print "After monkey patch"
print( select.select )
class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'
built-in function select
After monkey patch
function select at 0x1924de8
Python のランタイムは、モジュール、クラス、そして関数に至るまで、
ほとんどのオブジェクトについて実行時に編集することを許しています。
これは一般的にはとーっても悪いやり方です。「暗黙の副作用」を作り出し、
問題が発生した時にデバッグするのを非常に難しくするからです。
それでも、 Python 自体の基本的な振る舞いを変更する必要がある例外的な状況では
モンキーパッチを使うことができます。
この場合、 gevent は標準ライブラリの socket, ssl, threading, select
などにあるの多くの
ブロックするシステムコールを協調的に動作するようにパッチします。
例えば、 Redis の Python バインディングは通常の TCP ソケットを使って
redis-server
インスタンスと通信します。
gevent.monkey.patch_all()
を実行するだけで、 redis バインディングは
協調的にリクエストをするようになり、その他の gevent のスタックと一緒に
動くようになります。
モンキーパッチを使えば、そのままでは gevent で動作しないライブラリを 1行のコードを書くだけで統合できるようになります。 モンキーパッチは邪悪ですが、この場合は必要悪です。
イベントは greenlet 間の非同期通信の一つです。
import gevent
from gevent.event import AsyncResult
a = AsyncResult()
def setter():
"""
After 3 seconds set wake all threads waiting on the value of
a.
"""
gevent.sleep(3)
a.set()
def waiter():
"""
After 3 seconds the get call will unblock.
"""
a.get() # blocking
print 'I live!'
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])
Event オブジェクトの拡張の AsyncResult を使うと、モーニングコール (wakeup call) 付きの値を送ることができます。 これは将来どこかのタイミングで設定される値に対する参照を持っているので、 future とか deferred と呼ばれることもあります。
import gevent
from gevent.event import AsyncResult
a = AsyncResult()
def setter():
"""
After 3 seconds set the result of a.
"""
gevent.sleep(3)
a.set('Hello!')
def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print a.get()
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])
キューはデータの順序付き集合で、標準的な put / get
操作を持っています。
これは greenlet をまたいで安全に操作できるように実装されています。
例えば、ある greenlet がキューから要素を取得した時、同じ要素が並行して 動いている他の greenlet でも取得されることはありません。
import gevent
from gevent.queue import Queue
tasks = Queue()
def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
print('Quitting time!')
def boss():
for i in xrange(1,25):
tasks.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!
キューは必要があれば put
でも get
でもブロックすることがあります。
put
と get
にはそれぞれブロックしないバージョンとして
それぞれ put_nowait
と get_nowait
が用意されています。
これらの操作は実行できない場合はブロックする代わりに
gevent.queue.Empty
か gevent.queue.Full
例外を発生させます。
次の例では、並行して動いている boss と複数の worker がいて、
3要素以上格納できない制限付きのキューがあります。
この制限により、キューに空きが無いときは空きができるまで put
操作がブロックして、キューが空の場合は要素が格納されるまで get
操作がブロックします。
get
には timeout 引数を設定して、その時間内に要素を取得できない
場合は gevent.queue.Empty
例外を発生させて終了します。
import gevent
from gevent.queue import Queue, Empty
tasks = Queue(maxsize=3)
def worker(n):
try:
while True:
task = tasks.get(timeout=1) # decrements queue size by 1
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
except Empty:
print('Quitting time!')
def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
"""
for i in xrange(1,10):
tasks.put(i)
print('Assigned all work in iteration 1')
for i in xrange(10,20):
tasks.put(i)
print('Assigned all work in iteration 2')
gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])
Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker bob got task 5
Worker john got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker bob got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker bob got task 17
Worker john got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!
グループとは、複数の greenlet をまとめてスケジュールしたり管理するものです。
Python の multiprocessing
ライブラリの並列ディスパッチを置き換える用途にも使えます。
import gevent
from gevent.pool import Group
def talk(msg):
for i in xrange(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
group = Group()
group.add(g1)
group.add(g2)
group.join()
group.add(g3)
group.join()
bar
bar
bar
foo
foo
foo
fizz
fizz
fizz
これは非同期なタスクのグループを管理するのに便利です。
上で述べたように、 Group はグループ化された greenlet に対してジョブを ディスパッチし、その結果をいろいろな方法で取得するためのAPIを提供しています。
import gevent
from gevent import getcurrent
from gevent.pool import Group
group = Group()
def hello_from(n):
print('Size of group', len(group))
print('Hello from Greenlet %s' % id(getcurrent()))
group.map(hello_from, xrange(3))
def intensive(n):
gevent.sleep(3 - n)
return 'task', n
print('Ordered')
ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
print(i)
print('Unordered')
igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
print(i)
Size of group 3
Hello from Greenlet 4433885296
Size of group 3
Hello from Greenlet 4434739440
Size of group 3
Hello from Greenlet 4434739920
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)
pool は並行数を制限しながら動的な数の greenlet を扱うためのものです。 たくさんのネットワークやIOバウンドのタスクを並列に実行したい場合に 最適です。
import gevent
from gevent.pool import Pool
pool = Pool(2)
def hello_from(n):
print('Size of pool', len(pool))
pool.map(hello_from, xrange(3))
Size of pool 2
Size of pool 2
Size of pool 1
gevent を使ったサービスを作るときに、よく中央に pool を持った 構成で設計します。 例えばたくさんのソケットをポーリングするクラスです。
from gevent.pool import Pool
class SocketPool(object):
def __init__(self):
self.pool = Pool(1000)
self.pool.start()
def listen(self, socket):
while True:
socket.recv()
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
def shutdown(self):
self.pool.kill()
セマフォ(Semaphore) は greenlet に並行アクセスや並行実行を調整する低レベルな
同期機構です。
セマフォは acquire
と release
というメソッドを提供しています。
acquire
と release
の呼び出された回数のをセマフォの bound と呼びます。
bound が0になると、他の greenlet が release
するまでブロックします。
from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore
sem = BoundedSemaphore(2)
def worker1(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
sleep(0)
sem.release()
print('Worker %i released semaphore' % n)
def worker2(n):
with sem:
print('Worker %i acquired semaphore' % n)
sleep(0)
print('Worker %i released semaphore' % n)
pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))
Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore
bound が 1 のセマフォのことをロック(Lock)と言います。 ロックを使うと一つの greenlet だけを実行可能にすることができます。 プログラムのコンテキストでなにかのリソースを並行して複数の greenlet から使わないようにするために利用されます。
Gevent は Greenlet コンテキストごとに local なデータを保存する方法を提供しています。
この機能は、内部では greenlet の getcurrent()
で得られる値を使って保存される
プライベートな名前空間に対して名前解決を行います。
import gevent
from gevent.local import local
stash = local()
def f1():
stash.x = 1
print(stash.x)
def f2():
stash.y = 2
print(stash.y)
try:
stash.x
except AttributeError:
print("x is not local to f2")
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)
gevent.joinall([g1, g2])
1
2
x is not local to f2
Gevent に統合されている多くの Web フレームワークが、HTTPセッションオブジェクトを gevent の thread local 内に保存しています。例えば、 Werkzeug のユーティリティー ライブラリーとその中の proxy オブジェクトを利用して、 Flask のような request オブジェクトを作ることができます。
from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager
from gevent.wsgi import WSGIServer
_requests = local()
request = LocalProxy(lambda: _requests.request)
@contextmanager
def sessionmanager(environ):
_requests.request = Request(environ)
yield
_requests.request = None
def logic():
return "Hello " + request.remote_addr
def application(environ, start_response):
status = '200 OK'
with sessionmanager(environ):
body = logic()
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
return [body]
WSGIServer(('', 8000), application).serve_forever()
Flask のシステムはこれよりも少し複雑ですが、 thread local をローカルの セッションストレージに利用するというアイデアは共通です。
Gevent 1.0 から、 gevent.subprocess
-- Python の subprocess
モジュールにパッチを当てたバージョン --
が追加されました。
このモジュールは子プロセスを協調的に待つことができます。
import gevent
from gevent.subprocess import Popen, PIPE
def cron():
while True:
print "cron"
gevent.sleep(0.2)
g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print out.rstrip()
cron
cron
cron
cron
cron
Linux
多くの人が gevent
と一緒に multiprocessing
を利用したいと考えています。
これを実現するためにはまず、 multiprocessing
が提供しているプロセス間通信が
デフォルトでは協調的に動作しないという問題を解決しなければなりません。
multiprocessing.Connection
ベースのオブジェクト (Pipe
など) が内部のファイルディスクリプタを公開しているので、
gevent.socket.wait_read
と wait_write
を使って実際に read/write する前に
read可能/write可能 イベントを協調的に待つことができます。
import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write
# To Process
a, b = Pipe()
# From Process
c, d = Pipe()
def relay():
for i in xrange(10):
msg = b.recv()
c.send(msg + " in " + str(i))
def put_msg():
for i in xrange(10):
wait_write(a.fileno())
a.send('hi')
def get_msg():
for i in xrange(10):
wait_read(d.fileno())
print(d.recv())
if __name__ == '__main__':
proc = Process(target=relay)
proc.start()
g1 = gevent.spawn(get_msg)
g2 = gevent.spawn(put_msg)
gevent.joinall([g1, g2], timeout=1)
ただし、まだ他にも multiprocessing
と gevent の組み合わせは OS 依存の落とし穴を持っています。
multiprocessing.Process
の生成前に spawn された greenlet は
親プロセスと子プロセスの両方で実行されます。put_msg()
からの a.send()
の呼び出しは、呼び出し側スレッドを
非協調的にブロックする可能性があります。
write可能イベントは最低1バイトを書き込み可能であることしか保証していません。
write が完了する前に内部のバッファがいっぱいになる可能性があります。wait_write()
/ wait_read()
ベースの方法は、 Windows では動作しません。
(IOError: 3 is not a socket (files are not supported)
)
Windows ではパイプのイベントを監視できないからです。Python の gipc パッケージはこの問題を、 POSIX 準拠のシステムと
Windows の両方で透過的に解決しています。
このパッケージは gevent に対応した multiprocessing.Process
ベースの子プロセスと
パイプを使った協調的なプロセス間通信を提供しています。
アクターモデルとは Erlang 言語によって有名になった高レベルの並行プログラミングモデルです。 基本となる考え方を簡単に言うと、独立した複数のアクターが、他のアクターから メッセージを受信するための受信箱を持っているというものです。 アクターの中のメインループは、メッセージを受信してはそれに対応する 行動を取ります。
gevent はプリミティブとしてのアクター型は持っていませんが、 Greenlet クラスを継承して Queue を使うことで簡単に実現できます。
import gevent
from gevent.queue import Queue
class Actor(gevent.Greenlet):
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.receive(message)
利用例:
import gevent
from gevent.queue import Queue
from gevent import Greenlet
class Pinger(Actor):
def receive(self, message):
print message
pong.inbox.put('ping')
gevent.sleep(0)
class Ponger(Actor):
def receive(self, message):
print message
ping.inbox.put('pong')
gevent.sleep(0)
ping = Pinger()
pong = Ponger()
ping.start()
pong.start()
ping.inbox.put('start')
gevent.joinall([ping, pong])
製作者によれば、 ZeroMQ は "並行フレームワークのように振る舞うソケットライブラリ" ということです。 並行・分散アプリケーションを作るときに、非常に強力な メッセージングレイヤーになります。
ZeroMQ はたくさんの種類の socket プリミティブを提供しています。
一番シンプルなものは リクエスト-レスポンス ペアです。
この socket は send
と recv
というメソッドを持っていて、どちらも
通常はブロックします。しかし Travis Cline が
gevent.socket を使って ZeroMQ socket をノンブロッキングにポーリングするように
してくれました。 pip install gevent-zeromq
でインストールできます。
(訳注: これは pyzmq に取り込まれ、最新版の 2.2.0.1 では zmq.green
を
import して利用することができます。)
# Note: Remember to ``pip install pyzmq gevent_zeromq``
import gevent
from gevent_zeromq import zmq
# Global Context
context = zmq.Context()
def server():
server_socket = context.socket(zmq.REQ)
server_socket.bind("tcp://127.0.0.1:5000")
for request in range(1,10):
server_socket.send("Hello")
print('Switched to Server for ', request)
# Implicit context switch occurs here
server_socket.recv()
def client():
client_socket = context.socket(zmq.REP)
client_socket.connect("tcp://127.0.0.1:5000")
for request in range(1,10):
client_socket.recv()
print('Switched to Client for ', request)
# Implicit context switch occurs here
client_socket.send("World")
publisher = gevent.spawn(server)
client = gevent.spawn(client)
gevent.joinall([publisher, client])
Switched to Server for 1
Switched to Client for 1
Switched to Server for 2
Switched to Client for 2
Switched to Server for 3
Switched to Client for 3
Switched to Server for 4
Switched to Client for 4
Switched to Server for 5
Switched to Client for 5
Switched to Server for 6
Switched to Client for 6
Switched to Server for 7
Switched to Client for 7
Switched to Server for 8
Switched to Client for 8
Switched to Server for 9
Switched to Client for 9
# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``
from gevent.server import StreamServer
def handle(socket, address):
socket.send("Hello from a telnet!\n")
for i in range(5):
socket.send(str(i) + '\n')
socket.close()
server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()
gevent は HTTP のための2種類の WSGI サーバーを提供しています。
wsgi
と pywsgi
です。
gevent 1.0 より前のバージョンでは、 gevent は libev の代わりに libevent
を利用していました。 libevent は高速な HTTP サーバーを持っており、
gevent の wsgi
サーバーで利用されていました。
gevent 1.0 からは libev が http サーバーを持っていないので、
gevent.wsgi
はピュアPythonで実装された gevent.pywsgi
への
ただのエイリアスになっています。
ストリーミングHTTPサービスを実現するには、まず HTTP ヘッダにコンテンツの サイズを出力するのをやめます。その代わりに接続をつないだままにしておき、 データチャンクの先頭にそのチャンクの大きさを示す hex digit をつけて 送信します。サイズが0のチャンクを送るとストリームが終了します。
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
8
<p>Hello
9
World</p>
0
pywsgi (gevent 1.0 以降は gevent.wsgi でも同じ) を使うと、ハンドラを ジェネレータとして作成しチャンクを yield していくことでストリーミング を実現できます。
from gevent.pywsgi import WSGIServer
def application(environ, start_response):
status = '200 OK'
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
yield "<p>Hello"
yield "World</p>"
WSGIServer(('', 8000), application).serve_forever()
import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import simplejson as json
data_source = Queue()
def producer():
while True:
data_source.put_nowait('Hello World')
gevent.sleep(1)
def ajax_endpoint(environ, start_response):
status = '200 OK'
headers = [
('Content-Type', 'application/json')
]
start_response(status, headers)
while True:
try:
datum = data_source.get(timeout=5)
yield json.dumps(datum) + '\n'
except Empty:
pass
gevent.spawn(producer)
WSGIServer(('', 8000), ajax_endpoint).serve_forever()
Websocket のサンプルは gevent-websocket を利用しています。
# Simple gevent-websocket server
import json
import random
from gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandler
class WebSocketApp(object):
'''Send random data to the websocket'''
def __call__(self, environ, start_response):
ws = environ['wsgi.websocket']
x = 0
while True:
data = json.dumps({'x': x, 'y': random.randint(1, 5)})
ws.send(data)
x += 1
sleep(0.5)
server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),
handler_class=WebSocketHandler)
server.serve_forever()
HTML Page:
<html>
<head>
<title>Minimal websocket application</title>
<script type="text/javascript" src="jquery.min.js"></script>
<script type="text/javascript">
$(function() {
// Open up a connection to our server
var ws = new WebSocket("ws://localhost:10000/");
// What do we do when we get a message?
ws.onmessage = function(evt) {
$("#placeholder").append('<p>' + evt.data + '</p>')
}
// Just update our conn_status field with the connection status
ws.onopen = function(evt) {
$('#conn_status').html('<b>Connected</b>');
}
ws.onerror = function(evt) {
$('#conn_status').html('<b>Error</b>');
}
ws.onclose = function(evt) {
$('#conn_status').html('<b>Closed</b>');
}
});
</script>
</head>
<body>
<h1>WebSocket Example</h1>
<div id="conn_status">Not Connected</div>
<div id="placeholder" style="width:600px;height:300px;"></div>
</body>
</html>
最後に意欲的なサンプルとして、リアルタイムチャットルームを作ります。 このサンプルは Flask を利用しています。 (代わりに Django や Pyramid を使ってもいいですよ!) 必要な JavaScript と HTML ファイルは ここ にあります。
# Micro gevent chatroom.
# ----------------------
from flask import Flask, render_template, request
from gevent import queue
from gevent.pywsgi import WSGIServer
import simplejson as json
app = Flask(__name__)
app.debug = True
rooms = {
'topic1': Room(),
'topic2': Room(),
}
users = {}
class Room(object):
def __init__(self):
self.users = set()
self.messages = []
def backlog(self, size=25):
return self.messages[-size:]
def subscribe(self, user):
self.users.add(user)
def add(self, message):
for user in self.users:
print user
user.queue.put_nowait(message)
self.messages.append(message)
class User(object):
def __init__(self):
self.queue = queue.Queue()
@app.route('/')
def choose_name():
return render_template('choose.html')
@app.route('/<uid>')
def main(uid):
return render_template('main.html',
uid=uid,
rooms=rooms.keys()
)
@app.route('/<room>/<uid>')
def join(room, uid):
user = users.get(uid, None)
if not user:
users[uid] = user = User()
active_room = rooms[room]
active_room.subscribe(user)
print 'subscribe', active_room, user
messages = active_room.backlog()
return render_template('room.html',
room=room, uid=uid, messages=messages)
@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):
user = users[uid]
room = rooms[room]
message = request.form['message']
room.add(':'.join([uid, message]))
return ''
@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):
try:
msg = users[uid].queue.get(timeout=10)
except queue.Empty:
msg = []
return json.dumps(msg)
if __name__ == "__main__":
http = WSGIServer(('', 5000), app)
http.serve_forever()