异步Web框架Tornado初探

python的3个主流Web框架Django,Flask,Tornado,前两个都用过。一个大而全,一个小而美。而Tornado与它们都不太一样,主打异步,通过非阻塞网络I/O支撑起大量的链接。
大概看了看Introduction to Tornado,路由,模版,配置这些和其他的框架大同小异。而异步web服务算是最具特色的一个部分了,所以在这里梳理了一下学习的笔记。

异步响应

Tornado使用一个异步装饰器@tornado.web.asynchronous将响应函数异步化

class AsyncHandler(RequestHandler):
    @asynchronous
    def get(self):
        http_client = AsyncHTTPClient()
        http_client.fetch("http://example.com",
                          callback=self.on_response)

    def on_response(self, response):
        do_something_with_response(response)
        self.finish()

get方法里的异步函数fetch()并不直接放回结果,而是用一个回调参数callback=self.on_response,等fetch()得到结果后,再返回给异步回调。
异步回调函数on_response执行完后,必须调用self.finish()关闭链接。
使用这种方式,在fetch()等待结果返回的时间里,Tornado就可以抽空去响应其他的请求了。但需要注意的是响应函数中比较耗时的动作一定要是可以异步的,比如上面的fetch,改成requests.get的话,程序就会阻塞在那里等待requests.get执行完。

异步生成器

每次将响应函数拆成两部分总是显得有些麻烦,Tornado中使用了一个gen装饰器将两个部分合在了一起。

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.finish()

用gen.coroutine装饰get方法,并且用yield生成器返回异步函数值执行的结果:

response = yield http_client.fetch("http://example.com")

除了这里稍有不同,其他地方的写法几乎让人感觉不到是在写异步响应。

长轮询

在Flask或Django中实时向客户端展示某一资源的状态可以用轮询,即客户端定时发出请求查看资源状态。这样即会产生大量无用的请求,又不能做到真正及时的获取到资源状态。而使用Tornado时,由于它的异步特性,可以使用长轮询避免这些问题。

With long polling, the client requests information from the server exactly as in normal polling, but with the expectation the server may not respond immediately. If the server has no new information for the client when the poll is received, instead of sending an empty response, the server holds the request open and waits for response information to become available. Once it does have new information, the server immediately sends an HTTP/S response to the client, completing the open HTTP/S Request. Upon receipt of the server response, the client often immediately issues another server request. In this way the usual response latency (the time between when the information first becomes available and the next client request) otherwise associated with polling clients is eliminated.

上面是wikipedia中对Long polling的描述。服务器不会立即响应客户端的请求,而是先将他们挂起,等带资源状态改变后,再返回响应。而客户端收到响应后,又立刻发送另一个请求给服务器。

来看看Tornado中如何实现长轮询的,完整代码在这里。先看看客户端:

function requestInventory() {
    jQuery.getJSON('//localhost:8000/cart/status', {session: document.session},
        function(data, status, xhr) {
            $('#count').html(data['inventoryCount']);
            setTimeout(requestInventory, 0);
        }
    );
}

客户端ajax请求收到响应后,立刻发送另一个ajax请求。

class StatusHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.application.shoppingCart.register(callback=self.on_message)

    def on_message(self, count):
        self.write('{"inventoryCount":"%d"}' % count)
        self.finish()

服务器收到请求后,将回调函数作为参数,传给shoppingCart.register,跟进到这个方法:

callbacks = []
def register(self, callback):
    self.callbacks.append(callback)

register将收到的回调函数放入一个“待通知列表”里。

def moveItemToCart(self, session):
    if session in self.carts:
        return

    self.carts[session] = True
    self.notifyCallbacks()
    
def notifyCallbacks(self):
    for c in self.callbacks:
        self.callbackHelper(c)

    self.callbacks = []

当资源状态改变时触发moveItemToCart,其中notifyCallbacks会逐一调用“待通知列表”里的回调函数,返回ajax响应。通知完后,再将“待通知列表”清空。

Websocket

另外一个实现实时展示资源状态的技术是Websocket,当然Flask和Django也可以利用插件实现,但Tornado内置了websocket功能,比前两个框架简单太多了。
来看看Tornado中如何使用Websocket,完整代码在这里

function requestInventory() {
    var host = 'ws://localhost:8000/cart/status';

    var websocket = new WebSocket(host);

    websocket.onopen = function (evt) { };
    websocket.onmessage = function(evt) {
        $('#count').html($.parseJSON(evt.data)['inventoryCount']);
    };
    websocket.onerror = function (evt) { };
}

客户端创建一个websocket实例,使用onmessage监听服务器发送的消息。

class StatusHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        self.application.shoppingCart.register(self.callback)

    def on_close(self):
        self.application.shoppingCart.unregister(self.callback)

    def on_message(self, message):
        pass
        
    def callback(self, count):
        self.write_message('{"inventoryCount":"%d"}' % count)

服务器端创建一个WebSocketHandler类,在开启和关闭websocket连接时,都将回调函数作为参数传递给shoppingCart类中的对应方法。

class ShoppingCart(object):
    totalInventory = 10
    callbacks = []
    carts = {}

    def register(self, callback):
        self.callbacks.append(callback)

    def unregister(self, callback):
        self.callbacks.remove(callback)
        
    def moveItemToCart(self, session):
        if session in self.carts:
            return

        self.carts[session] = True
        self.notifyCallbacks()

    def removeItemFromCart(self, session):
        if session not in self.carts:
            return

        del(self.carts[session])
        self.notifyCallbacks()

    def notifyCallbacks(self):
        for callback in self.callbacks:
            callback(self.getInventoryCount())

    def getInventoryCount(self):
        return self.totalInventory - len(self.carts)

ShoppingCart类和之前的长轮询比起来变化不大,只是notifyCallbacks发送响应后,不用再把“待通知列表”清空,而是由断开websocket连接时调用的unregister删掉该连接的回调函数。

异步耗时的操作

需要注意的是,操作一些其他资源,如mysql、redis等,也需要用相应的异步扩展。在github上可以找到很多这样的扩展:

并不是所有的操作都有对应的异步扩展,对于那些没有异步扩展的操作可以使用线程池将操作异步化,借助concurrent.futures可以很方便得实现:

from concurrent.futures import ThreadPoolExecutor
backend = futures.ThreadPoolExecutor(4)

class SomeHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        result = yield self.backend.submit(func, args)
        do_something_with_result(result)
        self.finish()

这个并发库在python3自带在python2需要安装sudo pip install futures。另外,当一个 Future 关联的调用等待另外一个 Future 的执行结果的时候,就有可能发生死锁。

Comments
Write a Comment