我在Python入门时用的框架就是Tornado, 在当时大而全Django横行的年代 破天荒的使用了epoll(当然有gevent的存在). 我喜欢Tornado的地方有两点: 一是彩色 的终端日志, 二是autoreload的功能. 这两点让我觉得在开发时是一种享受.

很可惜的就是当时对异步以及epoll的理解不够, 协程的使用也是完全不懂, 只注重了 上面两个比较表面的东西. 之后也好久没有用过它了. 前些天我又在写个简单的UDP服务器, 所以也就尝试用用看, 没想到Tornado已经到了6.0了. 看文档时才发现, 从5.0开始, Tornado已经整合了asyncio, 也就是说, 那些使用asyncio的库, 已经可以用在 Tornado上了.

我的需求

其实我的需求就两个:

  1. 这个服务器必须是UDP协议的, 因为客户端只支持UDP.
  2. 这个服务器在处理请求结束后, 需要使用Redis, 我希望这部分是异步进行的, 对程序性能影响越小越好.

一些想法

我也不是什么大佬, 从头开始编写程序还是挺难的, 需要找一个demo借鉴一下, 然后再慢慢改, 最后让程序符合自己预期的功能就可以了. 这应该是最快的一些方式吧.

大致过程就是这样了:

  1. 先在Tornado源码库里面找, 发现了tcpserver, httpserver相关的代码, 就是没有 udpserver相关的代码, 能怎么办, 很绝望.
  2. 好在Google中搜索tornado udp server的时候, 出现了第一个结果, 点我查看gist代码. 虽然是5年前的代码, 有总比没有强, 简单阅读一下做个参考也是好的.

代码中的主要逻辑

上述的gist代码中, 主要逻辑点有两个:

  1. 新建一个udp的socket, 监听相应的端口.
  2. 为了使用Tornado的IOLoop, 需要将socket注册到io_loop中, io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ) 这样, 有新的请求到达时, 会调用accept_handler进行处理.

像程序中UDPServer这个类, 只是属于锦上添花的东西, 可是使你的代码更好扩展.

结合新版的Tornado来看

这里我说的新版就是指使用了asyncio.get_event_loop()之后的Tornado, 就可以自定义使用async以及await这两个关键字.

趁机会也阅读一下Tornado中的tcpserverIOLoop. 从代码层面讲, udp与tcp的 不同就是在创建时使用了不同的参数, 而后tcp需要accept之后才可以进行传输, 而UDP 不需要, 只有单纯的recv以及send.

在io_loop.py中, 其实是有下面一段代码的(我加了注释, 最好从main函数开始, 而后向上看):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import errno
import functools
import socket

import tornado.ioloop
from tornado.iostream import IOStream

# 这部分代码已经是由asyncio的io_loop在处理了, IOStream的函数大家有兴趣也可以去查看
# 不过我想我已经达到了目的. 那就是获得数据后, 如何把控制权传递给asyncio来处理,
# 可以看出方法就是借助下方的`io_loop.spawn_callback`
async def handle_connection(connection, address):
stream = IOStream(connection)
message = await stream.read_until_close()
print("message from client:", message.decode().strip())

def connection_ready(sock, fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error as e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
io_loop = tornado.ioloop.IOLoop.current()
# 这里是关键, 收到连接请求后, 不直接调用handle_connection(直接使用就不是异步了),
# 而是将请求放在了io_loop中, 由io_loop去调度这部分代码.
io_loop.spawn_callback(handle_connection, connection, address)

if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", 8888))
sock.listen(128)

io_loop = tornado.ioloop.IOLoop.current()
callback = functools.partial(connection_ready, sock)

# 添加回调函数, 先前的UDP server类似
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()

这是一段简单的TCP服务的测试代码, 看到它, 我就知道, udp代码可以照这个来改了.

如何修改先前的UDP server

在上面的注释中, 我已经说道, 收到连接后, 使用io_loop.spawn_callback, 将控制权交给asyncio, 对于UDP服务器, 只要收到数据, 即收到连接, 就应该转交控制权了. 程序应该改成下面这个样子(这个函数中使用了闭包, 可以不使用functools.partial).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def add_accept_handler(sock, callback, io_loop=None):
if io_loop is None:
io_loop = tornado.ioloop.IOLoop.current()

def accept_handler(fd, events):
while True:
try:
data, address = sock.recvfrom(2500)
except socket.error as e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
io_loop.spawn_callback(callback, sock, address, data)
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)


## 在UDPServer类中, 这样调用了add_accept_handler
# add_accept_handler(sock, self._on_recive, io_loop=self.io_loop)
# 所以, 将`_on_recive`修改成为async形式的函数即可, on_receive如下:

class UDPServer():
# ...
async def _on_recive(self, sock, address, data):
# some process and generate reply
# 响应udp请求.
# sock.sendto(reply, address)
await redis_work(...)
# ...

这里udp的请求需要尽快响应, 而redis的存储可以放缓, 所以将await操作放在了 最后, 由asyncio来等待redis的返回, 这样就不会影响当前用户的体验了, 全权交由了 asyncio来调度.

aioredis的使用与现有问题

hiredis并不是一个支持asyncio的库, 搜索的时候发现了aioredis, 所以就用 上了aioredis. 后来写博客时看到了aredis, 是国人所写, 看Github 主页貌似是B站某个大佬, 库也在维护中. 但我程序已经写完了, 不想换了, 希望大家用的时候试试看, 也支持一下国产的Python库.

aioredis的首页中就有简单的使用示例, 看样子可以直接拿来用了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import aioredis

loop = asyncio.get_event_loop()

async def go():
conn = await aioredis.create_connection(
'redis://localhost', loop=loop)
await conn.execute('set', 'my-key', 'value')
val = await conn.execute('get', 'my-key')
print(val)
conn.close()
await conn.wait_closed()
loop.run_until_complete(go())

我在使用中遇到了一些坑, 就是loop = asyncio.get_event_loop()这句话. 为了追求程序形式上的统一, 我使用了loop = IOLoop.cuurent()来获取 Tornado中当前的ioloop, 其实也是从asyncio中得到, 但是却发生了如下错误:

1
AttributeError: 'AsyncIOMainLoop' object has no attribute 'create_future'

后来也不纠结了, 反正都用的是asynio.get_event_loop, 就照着程序中的例子也 这样写了. 站在开发人员的角度, 我认为不久之后Tornado也会考虑合并问题的, 可能这个错误之后就不会出现了.

总结

回望这个程序, 其实本身的逻辑并不复杂, 但是想要结合Tornado来处理UDP请求, 需要读读 源码才能操纵.

当然, 原生的asyncio也是可以处理的UDP请求的, 具体的话请看Gist. 我选择Tornado真的很大程度是为了漂亮的终端log, 以及简单的代码, 强大的扩展性. 谢谢大家来阅读吧.