python-cookbook
  • Introduction
  • 第 1 章 数据结构和算法
    • 1.1 解压序列赋值给多个变量
    • 1.2 解压可迭代对象赋值给多个变量
    • 1.3 保留最后N个元素
    • 1.4 查找最大或最小的N个元素
    • 1.5 实现一个优先级队列
    • 1.6 字典中的键映射多个值
    • 1.7 字典排序
    • 1.8 字典的运算
    • 1.9 查找两字典的相同点
    • 1.10 删除序列相同元素并保持顺序
    • 1.11 命名切片
    • 1.12 序列中出现次数最多的元素
    • 1.13 通过某个关键字排序一个字典列表
    • 1.14 排序不支持原生比较的对象
    • 1.15 通过某个字段将记录分组
    • 1.16 过滤序列元素
    • 1.17 从字典中提取子集
    • 1.18 映射名称到序列元素
    • 1.19 转换并同时计算数据
    • 1.20 合并多个字典或映射
  • 第 2 章 字符串和文本
    • 2.1 使用多个界定符分割字符串
    • 2.2 字符串开头或结尾匹配
    • 2.3 用Shell通配符匹配字符串
    • 2.4 字符串匹配和搜索
    • 2.5 字符串搜索和替换
    • 2.6 字符串忽略大小写的搜索替换
    • 2.7 最短匹配模式
    • 2.8 多行匹配模式
    • 2.9 将Unicode文本标准化
    • 2.10 在正则式中使用Unicode
    • 2.11 删除字符串中不需要的字符
    • 2.12 审查清理文本字符串
    • 2.13 字符串对齐
    • 2.14 合并拼接字符串
    • 2.15 字符串中插入变量
    • 2.16 以指定列宽格式化字符串
    • 2.17 在字符串中处理html和xml
    • 2.18 字符串令牌解析
    • 2.19 实现一个简单的递归下降分析器
    • 2.20 字节字符串上的字符串操作
  • 第 3 章 数字日期和时间
    • 3.1 数字的四舍五入
    • 3.2 执行精确的浮点数运算
    • 3.3 数字的格式化输出
    • 3.4 二八十六进制整数
    • 3.5 字节到大整数的打包与解包
    • 3.6 复数的数学运算
    • 3.7 无穷大与NaN
    • 3.8 分数运算
    • 3.9 大型数组运算
    • 3.10 矩阵与线性代数运算
    • 3.11 随机选择
    • 3.12 基本的日期与时间转换
    • 3.13 计算最后一个周五的日期
    • 3.14 计算当前月份的日期范围
    • 3.15 字符串转换为日期
    • 3.16 结合时区的日期操作
  • 第 4 章 迭代器与生成器
    • 4.1 手动遍历迭代器
    • 4.2 代理迭代
    • 4.3 使用生成器创建新的迭代模式
    • 4.4 实现迭代器协议
    • 4.5 反向迭代
    • 4.6 带有外部状态的生成器函数
    • 4.7 迭代器切片
    • 4.8 跳过可迭代对象的开始部分
    • 4.9 排列组合的迭代
    • 4.10 序列上索引值迭代
    • 4.11 同时迭代多个序列
    • 4.12 不同集合上元素的迭代
    • 4.13 创建数据处理管道
    • 4.14 展开嵌套的序列
    • 4.15 顺序迭代合并后的排序迭代对象
    • 4.16 迭代器代替while无限循环
  • 第 5 章 文件与 IO
    • 5.1 读写文本数据
    • 5.2 打印输出至文件中
    • 5.3 使用其他分隔符或行终止符打印
    • 5.4 读写字节数据
    • 5.5 文件不存在才能写入
    • 5.6 字符串的I-O操作
    • 5.7 读写压缩文件
    • 5.8 固定大小记录的文件迭代
    • 5.9 读取二进制数据到可变缓冲区中
    • 5.10 内存映射的二进制文件
    • 5.11 文件路径名的操作
    • 5.12 测试文件是否存在
    • 5.13 获取文件夹中的文件列表
    • 5.14 忽略文件名编码
    • 5.15 打印不合法的文件名
    • 5.16 增加或改变已打开文件的编码
    • 5.17 将字节写入文本文件
    • 5.18 将文件描述符包装成文件对象
    • 5.19 创建临时文件和文件夹
    • 5.20 与串行端口的数据通信
    • 5.21 序列化Python对象
  • 第 6 章 数据编码和处理
    • 6.1 读写CSV数据
    • 6.2 读写JSON数据
    • 6.3 解析简单的XML数据
    • 6.4 增量式解析大型XML文件
    • 6.5 将字典转换为XML
    • 6.6 解析和修改XML
    • 6.7 利用命名空间解析XML文档
    • 6.8 与关系型数据库的交互
    • 6.9 编码和解码十六进制数
    • 6.10 编码解码Base64数据
    • 6.11 读写二进制数组数据
    • 6.12 读取嵌套和可变长二进制数据
    • 6.13 数据的累加与统计操作
  • 第 7 章 函数
    • 7.1 可接受任意数量参数的函数
    • 7.2 只接受关键字参数的函数
    • 7.3 给函数参数增加元信息
    • 7.4 返回多个值的函数
    • 7.5 定义有默认参数的函数
    • 7.6 定义匿名或内联函数
    • 7.7 匿名函数捕获变量值
    • 7.8 减少可调用对象的参数个数
    • 7.9 将单方法的类转换为函数
    • 7.10 带额外状态信息的回调函数
    • 7.11 内联回调函数
    • 7.12 访问闭包中定义的变量
  • 第 8 章 类与对象
    • 8.1 改变对象的字符串显示
    • 8.2 自定义字符串的格式化
    • 8.3 让对象支持上下文管理协议
    • 8.4 创建大量对象时节省内存方法
    • 8.5 在类中封装属性名
    • 8.6 创建可管理的属性
    • 8.7 调用父类方法
    • 8.8 子类中扩展property
    • 8.9 创建新的类或实例属性
    • 8.10 使用延迟计算属性
    • 8.11 简化数据结构的初始化
    • 8.12 定义接口或者抽象基类
    • 8.13 实现数据模型的类型约束
    • 8.14 实现自定义容器
    • 8.15 属性的代理访问
    • 8.16 在类中定义多个构造器
    • 8.17 创建不调用init方法的实例
    • 8.18 利用Mixins扩展类功能
    • 8.19 实现状态对象或者状态机
    • 8.20 通过字符串调用对象方法
    • 8.21 实现访问者模式
    • 8.22 不用递归实现访问者模式
    • 8.23 循环引用数据结构的内存管理
    • 8.24 让类支持比较操作
    • 8.25 创建缓存实例
  • 第 9 章 元编程
    • 9.1 在函数上添加包装器
    • 9.2 创建装饰器时保留函数元信息
    • 9.3 解除一个装饰器
    • 9.4 定义一个带参数的装饰器
    • 9.5 可自定义属性的装饰器
    • 9.6 带可选参数的装饰器
    • 9.7 利用装饰器强制函数上的类型检查
    • 9.8 将装饰器定义为类的一部分
    • 9.9 将装饰器定义为类
    • 9.10 为类和静态方法提供装饰器
    • 9.11 装饰器为被包装函数增加参数
    • 9.12 使用装饰器扩充类的功能
    • 9.13 使用元类控制实例的创建
    • 9.14 捕获类的属性定义顺序
    • 9.15 定义有可选参数的元类
    • 9.16 args和*kwargs的强制参数签名
    • 9.17 在类上强制使用编程规约
    • 9.18 以编程方式定义类
    • 9.19 在定义的时候初始化类的成员
    • 9.20 利用函数注解实现方法重载
    • 9.21 避免重复的属性方法
    • 9.22 定义上下文管理器的简单方法
    • 9.23 在局部变量域中执行代码
    • 9.24 解析与分析Python源码
    • 9.25 拆解Python字节码
  • 第 10 章 模块与包
    • 10.1 构建一个模块的层级包
    • 10.2 控制模块被全部导入的内容
    • 10.3 使用相对路径名导入包中子模块
    • 10.4 将模块分割成多个文件
    • 10.5 利用命名空间导入目录分散的代码
    • 10.6 重新加载模块
    • 10.7 运行目录或压缩文件
    • 10.8 读取位于包中的数据文件
    • 10.9 将文件夹加入到sys.path
    • 10.10 通过字符串名导入模块
    • 10.11 通过钩子远程加载模块
    • 10.12 导入模块的同时修改模块
    • 10.13 安装私有的包
    • 10.14 创建新的Python环境
    • 10.15 分发包
  • 第 11 章 网络与 Web 编程
    • 11.1 作为客户端与HTTP服务交互
    • 11.2 创建TCP服务器
    • 11.3 创建UDP服务器
    • 11.4 通过CIDR地址生成对应的IP地址集
    • 11.5 创建一个简单的REST接口
    • 11.6 通过XML-RPC实现简单的远程调用
    • 11.7 在不同的Python解释器之间交互
    • 11.8 实现远程方法调用
    • 11.9 简单的客户端认证
    • 11.10 在网络服务中加入SSL
    • 11.11 进程间传递Socket文件描述符
    • 11.12 理解事件驱动的IO
    • 11.13 发送与接收大型数组
  • 第 12 章 并发编程
    • 12.1 启动与停止线程
    • 12.2 判断线程是否已经启动
    • 12.3 线程间通信
    • 12.4 给关键部分加锁
    • 12.5 防止死锁的加锁机制
    • 12.6 保存线程的状态信息
    • 12.7 创建一个线程池
    • 12.8 简单的并行编程
    • 12.9 Python的全局锁问题
    • 12.10 定义一个Actor任务
    • 12.11 实现消息发布-订阅模型
    • 12.12 使用生成器代替线程
    • 12.13 多个线程队列轮询
    • 12.14 在Unix系统上面启动守护进程
  • 第 13 章 脚本编程与系统管理
    • 13.1 通过重定向-管道-文件接受输入
    • 13.2 终止程序并给出错误信息
    • 13.3 解析命令行选项
    • 13.4 运行时弹出密码输入提示
    • 13.5 获取终端的大小
    • 13.6 执行外部命令并获取它的输出
    • 13.7 复制或者移动文件和目录
    • 13.8 创建和解压归档文件
    • 13.9 通过文件名查找文件
    • 13.10 读取配置文件
    • 13.11 给简单脚本增加日志功能
    • 13.12 给函数库增加日志功能
    • 13.13 实现一个计时器
    • 13.14 限制内存和CPU的使用量
    • 13.15 启动一个WEB浏览器
  • 第 14 章 测试、调试和异常
    • 14.1 测试stdout输出
    • 14.2 在单元测试中给对象打补丁
    • 14.3 在单元测试中测试异常情况
    • 14.4 将测试输出用日志记录到文件中
    • 14.5 忽略或期望测试失败
    • 14.6 处理多个异常
    • 14.7 捕获所有异常
    • 14.8 创建自定义异常
    • 14.9 捕获异常后抛出另外的异常
    • 14.10 重新抛出被捕获的异常
    • 14.11 输出警告信息
    • 14.12 调试基本的程序崩溃错误
    • 14.13 给你的程序做性能测试
    • 14.14 加速程序运行
  • 第 15 章 C 语言扩展
    • 15.1 使用ctypes访问C代码
    • 15.2 简单的C扩展模块
    • 15.3 编写扩展函数操作数组
    • 15.4 在C扩展模块中操作隐形指针
    • 15.5 从扩张模块中定义和导出C的API
    • 15.6 从C语言中调用Python代码
    • 15.7 从C扩展中释放全局锁
    • 15.8 C和Python中的线程混用
    • 15.9 用WSIG包装C代码
    • 15.10 用Cython包装C代码
    • 15.11 用Cython写高性能的数组操作
    • 15.12 将函数指针转换为可调用对象
    • 15.13 传递NULL结尾的字符串给C函数库
    • 15.14 传递Unicode字符串给C函数库
    • 15.15 C字符串转换为Python字符串
    • 15.16 不确定编码格式的C字符串
    • 15.17 传递文件名给C扩展
    • 15.18 传递已打开的文件给C扩展
    • 15.19 从C语言中读取类文件对象
    • 15.20 处理C语言中的可迭代对象
    • 15.21 诊断分段错误
Powered by GitBook
On this page
  • 问题
  • 解决方案
  • 讨论

Was this helpful?

  1. 第 11 章 网络与 Web 编程

11.12 理解事件驱动的IO

问题

你应该已经听过基于事件驱动或异步 I/O 的包,但是你还不能完全理解它的底层到底是怎样工作的, 或者是如果使用它的话会对你的程序产生什么影响。

解决方案

事件驱动 I/O 本质上来讲就是将基本 I/O 操作(比如读和写)转化为你程序需要处理的事件。 例如,当数据在某个 socket 上被接受后,它会转换成一个 receive 事件,然后被你定义的回调方法或函数来处理。 作为一个可能的起始点,一个事件驱动的框架可能会以一个实现了一系列基本事件处理器方法的基类开始:

class EventHandler:
    def fileno(self):
        'Return the associated file descriptor'
        raise NotImplemented('must implement')

    def wants_to_receive(self):
        'Return True if receiving is allowed'
        return False

    def handle_receive(self):
        'Perform the receive operation'
        pass

    def wants_to_send(self):
        'Return True if sending is requested'
        return False

    def handle_send(self):
        'Send outgoing data'
        pass

这个类的实例作为插件被放入类似下面这样的事件循环中:

import select

def event_loop(handlers):
    while True:
        wants_recv = [h for h in handlers if h.wants_to_receive()]
        wants_send = [h for h in handlers if h.wants_to_send()]
        can_recv, can_send, _ = select.select(wants_recv, wants_send, [])
        for h in can_recv:
            h.handle_receive()
        for h in can_send:
            h.handle_send()

事件循环的关键部分是 select() 调用,它会不断轮询文件描述符从而激活它。 在调用 select() 之前,时间循环会询问所有的处理器来决定哪一个想接受或发生。 然后它将结果列表提供给 select() 。然后 select() 返回准备接受或发送的对象组成的列表。 然后相应的 handle_receive() 或 handle_send() 方法被触发。

编写应用程序的时候,EventHandler 的实例会被创建。例如,下面是两个简单的基于 UDP 网络服务的处理器例子:

import socket
import time

class UDPServer(EventHandler):
    def __init__(self, address):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(address)

    def fileno(self):
        return self.sock.fileno()

    def wants_to_receive(self):
        return True

class UDPTimeServer(UDPServer):
    def handle_receive(self):
        msg, addr = self.sock.recvfrom(1)
        self.sock.sendto(time.ctime().encode('ascii'), addr)

class UDPEchoServer(UDPServer):
    def handle_receive(self):
        msg, addr = self.sock.recvfrom(8192)
        self.sock.sendto(msg, addr)

if __name__ == '__main__':
    handlers = [ UDPTimeServer(('',14000)), UDPEchoServer(('',15000))  ]
    event_loop(handlers)

测试这段代码,试着从另外一个 Python 解释器连接它:

>>> from socket import *
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'',('localhost',14000))
0
>>> s.recvfrom(128)
(b'Tue Sep 18 14:29:23 2012', ('127.0.0.1', 14000))
>>> s.sendto(b'Hello',('localhost',15000))
5
>>> s.recvfrom(128)
(b'Hello', ('127.0.0.1', 15000))

实现一个 TCP 服务器会更加复杂一点,因为每一个客户端都要初始化一个新的处理器对象。 下面是一个 TCP 应答客户端例子:

class TCPServer(EventHandler):
    def __init__(self, address, client_handler, handler_list):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
        self.sock.bind(address)
        self.sock.listen(1)
        self.client_handler = client_handler
        self.handler_list = handler_list

    def fileno(self):
        return self.sock.fileno()

    def wants_to_receive(self):
        return True

    def handle_receive(self):
        client, addr = self.sock.accept()
        # Add the client to the event loop's handler list
        self.handler_list.append(self.client_handler(client, self.handler_list))

class TCPClient(EventHandler):
    def __init__(self, sock, handler_list):
        self.sock = sock
        self.handler_list = handler_list
        self.outgoing = bytearray()

    def fileno(self):
        return self.sock.fileno()

    def close(self):
        self.sock.close()
        # Remove myself from the event loop's handler list
        self.handler_list.remove(self)

    def wants_to_send(self):
        return True if self.outgoing else False

    def handle_send(self):
        nsent = self.sock.send(self.outgoing)
        self.outgoing = self.outgoing[nsent:]

class TCPEchoClient(TCPClient):
    def wants_to_receive(self):
        return True

    def handle_receive(self):
        data = self.sock.recv(8192)
        if not data:
            self.close()
        else:
            self.outgoing.extend(data)

if __name__ == '__main__':
   handlers = []
   handlers.append(TCPServer(('',16000), TCPEchoClient, handlers))
   event_loop(handlers)

TCP 例子的关键点是从处理器中列表增加和删除客户端的操作。 对每一个连接,一个新的处理器被创建并加到列表中。当连接被关闭后,每个客户端负责将其从列表中删除。 如果你运行程序并试着用 Telnet 或类似工具连接,它会将你发送的消息回显给你。并且它能很轻松的处理多客户端连接。

讨论

实际上所有的事件驱动框架原理跟上面的例子相差无几。实际的实现细节和软件架构可能不一样, 但是在最核心的部分,都会有一个轮询的循环来检查活动 socket,并执行响应操作。

事件驱动 I/O 的一个可能好处是它能处理非常大的并发连接,而不需要使用多线程或多进程。 也就是说,select() 调用(或其他等效的)能监听大量的 socket 并响应它们中任何一个产生事件的。 在循环中一次处理一个事件,并不需要其他的并发机制。

事件驱动 I/O 的缺点是没有真正的同步机制。 如果任何事件处理器方法阻塞或执行一个耗时计算,它会阻塞所有的处理进程。 调用那些并不是事件驱动风格的库函数也会有问题,同样要是某些库函数调用会阻塞,那么也会导致整个事件循环停止。

对于阻塞或耗时计算的问题可以通过将事件发送个其他单独的现场或进程来处理。 不过,在事件循环中引入多线程和多进程是比较棘手的, 下面的例子演示了如何使用 concurrent.futures 模块来实现:

from concurrent.futures import ThreadPoolExecutor
import os

class ThreadPoolHandler(EventHandler):
    def __init__(self, nworkers):
        if os.name == 'posix':
            self.signal_done_sock, self.done_sock = socket.socketpair()
        else:
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self.signal_done_sock = socket.socket(socket.AF_INET,
                                                  socket.SOCK_STREAM)
            self.signal_done_sock.connect(server.getsockname())
            self.done_sock, _ = server.accept()
            server.close()

        self.pending = []
        self.pool = ThreadPoolExecutor(nworkers)

    def fileno(self):
        return self.done_sock.fileno()

    # Callback that executes when the thread is done
    def _complete(self, callback, r):

        self.pending.append((callback, r.result()))
        self.signal_done_sock.send(b'x')

    # Run a function in a thread pool
    def run(self, func, args=(), kwargs={},*,callback):
        r = self.pool.submit(func, *args, **kwargs)
        r.add_done_callback(lambda r: self._complete(callback, r))

    def wants_to_receive(self):
        return True

    # Run callback functions of completed work
    def handle_receive(self):
        # Invoke all pending callback functions
        for callback, result in self.pending:
            callback(result)
            self.done_sock.recv(1)
        self.pending = []

在代码中,run() 方法被用来将工作提交给回调函数池,处理完成后被激发。 实际工作被提交给 ThreadPoolExecutor 实例。 不过一个难点是协调计算结果和事件循环,为了解决它,我们创建了一对 socket 并将其作为某种信号量机制来使用。 当线程池完成工作后,它会执行类中的 _complete() 方法。 这个方法再某个 socket 上写入字节之前会讲挂起的回调函数和结果放入队列中。 fileno() 方法返回另外的那个 socket。 因此,这个字节被写入时,它会通知事件循环, 然后 handle_receive() 方法被激活并为所有之前提交的工作执行回调函数。

下面是一个简单的服务器,演示了如何使用线程池来实现耗时的计算:

# A really bad Fibonacci implementation
def fib(n):
    if n < 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

class UDPFibServer(UDPServer):
    def handle_receive(self):
        msg, addr = self.sock.recvfrom(128)
        n = int(msg)
        pool.run(fib, (n,), callback=lambda r: self.respond(r, addr))

    def respond(self, result, addr):
        self.sock.sendto(str(result).encode('ascii'), addr)

if __name__ == '__main__':
    pool = ThreadPoolHandler(16)
    handlers = [ pool, UDPFibServer(('',16000))]
    event_loop(handlers)

运行这个服务器,然后试着用其它 Python 程序来测试它:

from socket import *
sock = socket(AF_INET, SOCK_DGRAM)
for x in range(40):
    sock.sendto(str(x).encode('ascii'), ('localhost', 16000))
    resp = sock.recvfrom(8192)
    print(resp[0])

你应该能在不同窗口中重复的执行这个程序,并且不会影响到其他程序,尽管当数字便越来越大时候它会变得越来越慢。

你应该选择一个可以完成同样任务的高级框架。 不过,如果你理解了基本原理,你就能理解这些框架所使用的核心技术。 作为对回调函数编程的替代,事件驱动编码有时候会使用到协程,参考 12.12 小节的一个例子。

Previous11.11 进程间传递Socket文件描述符Next11.13 发送与接收大型数组

Last updated 5 years ago

Was this helpful?