# Two simple generator functions
def countdown(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
print('Blastoff!')
def countup(n):
x = 0
while x < n:
print('Counting up', x)
yield
x += 1
这些函数在内部使用 yield 语句,下面是一个实现了简单任务调度器的代码:
from collections import deque
class TaskScheduler:
def __init__(self):
self._task_queue = deque()
def new_task(self, task):
self._task_queue.append(task) # Admit a newly started task to the scheduler
def run(self):
# Run until there are no more tasks
while self._task_queue:
task = self._task_queue.popleft()
try:
# Run until the next yield statement
next(task)
self._task_queue.append(task)
except StopIteration:
# Generator is no longer executing
pass
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()
实际上,你可能想要使用生成器来实现简单的并发。 那么,在实现 actor 或网络服务器的时候你可以使用生成器来替代线程的使用。
下面的代码演示了使用生成器来实现一个不依赖线程的 actor:
from collections import deque
class ActorScheduler:
def __init__(self):
self._actors = { } # Mapping of names to actors
self._msg_queue = deque() # Message queue
def new_actor(self, name, actor):
# Admit a newly started actor to the scheduler and give it a name
self._msg_queue.append((actor,None))
self._actors[name] = actor
def send(self, name, msg):
# Send a message to a named actor
actor = self._actors.get(name)
if actor:
self._msg_queue.append((actor,msg))
def run(self):
# Run as long as there are pending messages.
while self._msg_queue:
actor, msg = self._msg_queue.popleft()
try:
actor.send(msg)
except StopIteration:
pass
if __name__ == '__main__':
def printer():
while True:
msg = yield
print('Got:', msg)
def counter(sched):
while True:
# Receive the current count
n = yield
if n == 0:
break
# Send to the printer task
sched.send('printer', n)
# Send the next count to the counter task (recursive)
sched.send('counter', n-1)
sched = ActorScheduler()
# Create the initial actors
sched.new_actor('printer', printer())
sched.new_actor('counter', counter(sched))
# Send an initial message to the counter to initiate
sched.send('counter', 10000)
sched.run()
from collections import deque
from select import select
# This class represents a generic yield event in the scheduler
class YieldEvent:
def handle_yield(self, sched, task):
pass
def handle_resume(self, sched, task):
pass
# Task Scheduler
class Scheduler:
def __init__(self):
self._numtasks = 0 # Total num of tasks
self._ready = deque() # Tasks ready to run
self._read_waiting = {} # Tasks waiting to read
self._write_waiting = {} # Tasks waiting to write
# Poll for I/O events and restart waiting tasks
def _iopoll(self):
rset,wset,eset = select(self._read_waiting,
self._write_waiting,[])
for r in rset:
evt, task = self._read_waiting.pop(r)
evt.handle_resume(self, task)
for w in wset:
evt, task = self._write_waiting.pop(w)
evt.handle_resume(self, task)
def new(self,task):
'''
Add a newly started task to the scheduler
'''
self._ready.append((task, None))
self._numtasks += 1
def add_ready(self, task, msg=None):
'''
Append an already started task to the ready queue.
msg is what to send into the task when it resumes.
'''
self._ready.append((task, msg))
# Add a task to the reading set
def _read_wait(self, fileno, evt, task):
self._read_waiting[fileno] = (evt, task)
# Add a task to the write set
def _write_wait(self, fileno, evt, task):
self._write_waiting[fileno] = (evt, task)
def run(self):
'''
Run the task scheduler until there are no tasks
'''
while self._numtasks:
if not self._ready:
self._iopoll()
task, msg = self._ready.popleft()
try:
# Run the coroutine to the next yield
r = task.send(msg)
if isinstance(r, YieldEvent):
r.handle_yield(self, task)
else:
raise RuntimeError('unrecognized yield event')
except StopIteration:
self._numtasks -= 1
# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
def __init__(self, sock, nbytes):
self.sock = sock
self.nbytes = nbytes
def handle_yield(self, sched, task):
sched._read_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
data = self.sock.recv(self.nbytes)
sched.add_ready(task, data)
class WriteSocket(YieldEvent):
def __init__(self, sock, data):
self.sock = sock
self.data = data
def handle_yield(self, sched, task):
sched._write_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
nsent = self.sock.send(self.data)
sched.add_ready(task, nsent)
class AcceptSocket(YieldEvent):
def __init__(self, sock):
self.sock = sock
def handle_yield(self, sched, task):
sched._read_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
r = self.sock.accept()
sched.add_ready(task, r)
# Wrapper around a socket object for use with yield
class Socket(object):
def __init__(self, sock):
self._sock = sock
def recv(self, maxbytes):
return ReadSocket(self._sock, maxbytes)
def send(self, data):
return WriteSocket(self._sock, data)
def accept(self):
return AcceptSocket(self._sock)
def __getattr__(self, name):
return getattr(self._sock, name)
if __name__ == '__main__':
from socket import socket, AF_INET, SOCK_STREAM
import time
# Example of a function involving generators. This should
# be called using line = yield from readline(sock)
def readline(sock):
chars = []
while True:
c = yield sock.recv(1)
if not c:
break
chars.append(c)
if c == b'\n':
break
return b''.join(chars)
# Echo server using generators
class EchoServer:
def __init__(self,addr,sched):
self.sched = sched
sched.new(self.server_loop(addr))
def server_loop(self,addr):
s = Socket(socket(AF_INET,SOCK_STREAM))
s.bind(addr)
s.listen(5)
while True:
c,a = yield s.accept()
print('Got connection from ', a)
self.sched.new(self.client_handler(Socket(c)))
def client_handler(self,client):
while True:
line = yield from readline(client)
if not line:
break
line = b'GOT:' + line
while line:
nsent = yield client.send(line)
line = line[nsent:]
client.close()
print('Client closed')
sched = Scheduler()
EchoServer(('',16000),sched)
sched.run()
f = some_generator()
# Initial result. Is None to start since nothing has been computed
result = None
while True:
try:
data = f.send(result)
result = # ... do some calculation ...
except StopIteration:
break