当前位置:主页 > 国内 >

迷失太空_再议Python协程——从yield到asyncio

协程,英文名Coroutine。
前面介绍Python的多线程,以及用多线程实现并发(参见这篇文章【浅析Python多线程】),今天介绍的协程也是常用的并发手段。本篇主要内容包含:协程的基本概念、协程库的实现原理以及Python中常见的协程库。

1 协程的基本概念

我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。假设现在启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每个线程每隔5ms就让出CPU,让其他线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?

如果我们能够自行调度自己写的程序,让一些代码块遇到IO操作时,切换去执行另外一些需要CPU操作的代码块,是不是节约了很多无畏的上下文切换呢?是的,协程就是针对这一情况而生的。我们把写好的一个应用程序分为很多个代码块,如下图所示:

把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程(通常是遇到IO操作时切换才有意义)。示意图如下:

?

所以,关于协程可以总结以下两点:

(1)线程的调度是由操作系统负责协程调度是程序自行负责

(2)与线程相比,协程减少了无畏的操作系统切换

实际上当遇到IO操作时做切换才更有意义,(因为IO操作不用占用CPU),如果没遇到IO操作,按照时间片切换,无意义。

举个例子,你在做一顿饭你要蒸饭和炒菜:最笨的方法是先蒸饭,饭蒸好了再去炒菜。这样一顿饭得花不少时间,就跟我们没采用并发编程一样。

ju ge li zi, ni zai zuo yi dun fan ni yao zheng fan he chao cai: zui ben de fang fa shi xian zheng fan, fan zheng hao le zai qu chao cai. zhe yang yi dun fan de hua bu shao shi jian, jiu gen wo men mei cai yong bing fa bian cheng yi yang.

多线程相当于,你5分钟在做蒸饭的工作,到了5分钟开始炒菜,又过了5分钟,你又去忙蒸饭。

协程相当于,你淘完米,放在电饭锅,按下煮饭键之后,你开始去炒菜。炒菜的时候油没热,你可以调佐料。这样,你炒两个菜出来,饭蒸好了。整个过程你没闲着,但是节约了不少时间。

2 基于yield实现协程

如1中所述,代码块A能够中断去执行代码块B,代码块B能够中断,执行代码块A。这不是和yield功能如出一辙吗?我们先回忆一下yield的功能:

(1) 在函数中,语句执行到yield,会返回yield 后面的内容;当再回来执行时,从yield的下一句开始执行;
(2) 使用yield语法的函数是一个生成器;
(3) python3中,通过 .__next__() 或者 next() 方法获取生成器的下一个值。

来看一个yield实现协程的例子:

from collections import deque

def sayHello(n):
    while n > 0:
        print("hello~", n)
        yield n
        n -= 1
    print("say hello")

def sayHi(n):
    x = 0
    while x < n:
        print("hi~", x)
        yield
        x += 1
    print("say hi")

# 使用yield语句,实现简单任务调度器
class TaskScheduler(object):
    def __init__(self):
        self._task_queue = deque()

    def new_task(self, task):
        """
        向调度队列添加新的任务
        """
        self._task_queue.append(task)

    def run(self):
        """
        不断运行,直到队列中没有任务
        """
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # 生成器结束
                pass

sched = TaskScheduler()
sched.new_task(sayHello(10))
sched.new_task(sayHi(15))
sched.run() 

上例执行时,你会看到sayHello()和sayHi() 不断交替执行,当执行sayHello()时,在yield处中断,当执行sayHi()时从yield处中断,切换回sayHello()从yield之后的一句开始执行。。。如此来回交替无缝连接。

3 基于yield实现actor模型

actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面我们通过yield来实现:

from collections import deque

class ActorScheduler:
    def __init__(self):
        self._actors = {}
        self._msg_queue = deque()

    def new_actor(self, name, actor):
        self._msg_queue.append((actor, None))
        self._actors[name] = actor

    def send(self, name, msg):
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor, msg))

    def run(self):
        while self._msg_queue:
            # print("队列:", self._msg_queue)
            actor, msg = self._msg_queue.popleft()
            # print("actor", actor)
            # print("msg", msg)
            try:
                 actor.send(msg)
            except StopIteration:
                 pass


if __name__ == "__main__":
    def say_hello():
        while True:
            msg = yield
            print("say hello", msg)

    def say_hi():
        while True:
            msg = yield
            print("say hi", msg)

    def counter(sched):
        while True:
            n = yield
            print("counter:", n)
            if n == 0:
                break
            sched.send("say_hello", n)
            sched.send("say_hi", n)
            sched.send("counter", n-1)

    sched = ActorScheduler()
    # 创建初始化 actors
    sched.new_actor("say_hello", say_hello())
    sched.new_actor("say_hi", say_hi())
    sched.new_actor("counter", counter(sched))

    sched.send("counter", 10)
    sched.run()

上例中:

(1) ActorScheduler 负责事件循环
(2) counter() 负责控制终止
(3) say_hello() / say_hi() 相当于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。

所以,当执行时,我们能够看到say_hello() / say_hi()不断交替切换执行,直到counter满足终止条件之后,协程终止。看懂上例可能需要花费一些时间。实际上我们已经实现了一个“操作系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

4 协程库的实现及asyncio

有了前面对协程的了解,我们可以思考怎样去实现一个协程库?我觉得可以从以下两个个方面去思考:

(1)事件循环 (event loop)。事件循环需要实现两个功能,一是顺序执行协程代码;二是完成协程的调度,即一个协程“暂停”时,决定接下来执行哪个协程。

(2)协程上下文的切换。基本上Python 生成器的 yeild 已经能完成切换,Python3中还有特定语法支持协程切换。

我们看一个比较复杂的例子:

from collections import deque
from select import select

class YieldEvent:
    def handle_yield(self, sched, task):
        pass

    def handle_resume(self, sched, task):
        pass

# 任务调度(相当于EventLoop)
class Scheduler:
    def __init__(self):
        self._numtasks = 0         # 任务总数量
        self._ready = deque()      # 等待执行的任务队列
        self._read_waiting = {}    # 正等待读的任务
        self._write_waiting = {}   # 正等待写的任务

    # 利用I/O多路复用 监听读写I/0
    def _iopoll(self):
        rset, wset, eset = select(self._read_waiting,
                                  self._write_waiting, [])
        for r in rset:
            evt, task = self._read_waiting.pop(r)
            evt.handle_resume(self, task)
        for w in wset:
            evt, task = self._write_waiting.pop(w)
            evt.handle_resume(self, task)

    def new(self, task):
        """添加一个新的任务"""
        self._ready.append((task, None))
        self._numtasks += 1

    def add_ready(self, task, msg=None):
        """添加到任务对列等待执行"""
        self._ready.append((task, msg))

    def _read_wait(self, fileno, evt, task):
        self._read_waiting[fileno] = (evt, task)

    def _write_wait(self, fileno, evt, task):
        self._write_waiting[fileno] = (evt, task)

    def run(self):
        while self._numtasks:
            # 如果任务数量为空,阻塞在select处,保持监听
            if not self._ready:
                self._iopoll()
            task, msg = self._ready.popleft()
            try:
                r = task.send(msg)
                if isinstance(r, YieldEvent):
                    r.handle_yield(self, task)
                else:
                    raise RuntimeError("unrecognized yield event")
            except StopIteration:
                self._numtasks -= 1

# 示例: 将协程抽象成YieldEvent的子类,并重写handle_yield和handle_resume方法
class ReadSocket(YieldEvent):
    def __init__(self, sock, nbytes):
        self.sock = sock
        self.nbytes = nbytes

    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)

    def handle_resume(self, sched, task):
        data = self.sock.recv(self.nbytes)
        sched.add_ready(task, data)

class WriteSocket(YieldEvent):
    def __init__(self, sock, data):
        self.sock = sock
        self.data = data

    def handle_yield(self, sched, task):
        sched._write_wait(self.sock.fileno(), self, task)

    def handle_resume(self, sched, task):
        nsent = self.sock.send(self.data)
        sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
    def __init__(self, sock):
        self.sock = sock

    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)

    def handle_resume(self, sched, task):
        r = self.sock.accept()
        sched.add_ready(task, r)


class Socket(object):
    def __init__(self, sock):
        self._sock = sock

    def recv(self, maxbytes):
        return ReadSocket(self._sock, maxbytes)

    def send(self, data):
        return WriteSocket(self._sock, data)

    def accept(self):
        return AcceptSocket(self._sock)

    def __getattr__(self, name):
        return getattr(self._sock, name)

if __name__ == "__main__":
    from socket import socket, AF_INET, SOCK_STREAM

    def readline(sock):
        chars = []
        while True:
            c = yield sock.recv(1)
            print(c)
            if not c:
                break
            chars.append(c)
            if c == b"
":
                break
        return b"".join(chars)

    # socket server 使用生成器
    class EchoServer:
        def __init__(self, addr, sched):
            self.sched = sched
            sched.new(self.server_loop(addr))

        def server_loop(self, addr):
            s = Socket(socket(AF_INET, SOCK_STREAM))
            s.bind(addr)
            s.listen(5)
            while True:
                c, a = yield s.accept()
                print("Got connection from ", a)
                print("got", c)
                self.sched.new(self.client_handler(Socket(c)))

        def client_handler(self, client):
            while True:
                try:
                    line = yield from readline(client)
                    if not line:
                        break

                    print("from Client::", str(line))
                except Exception:
                    break

                while line:
                    try:
                        nsent = yield client.sendall(line)
                        print("nsent", nsent)
                        line = line[nsent:]
                    except Exception:
                        break
            client.close()
            print("Client closed")

    sched = Scheduler()
    EchoServer(("localhost", 9999), sched)
    sched.run()  

Scheduler相当于实现事件循环并调度协程, 添加到事件循环中的事件必须继承YieldEvent, 并重写它定义的两个方法。此例比较难,看不懂可以忽略。

我们看一下Python3中的协程库asyncio是怎么实现的:

import asyncio

@asyncio.coroutine
def say_hi(n):
    print("start:", n)
    r = yield from asyncio.sleep(2)
    print("end:", n)

loop = asyncio.get_event_loop()
tasks = [say_hi(0), say_hi(1)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# start: 1
# start: 0
# 停顿两秒
# end: 1
# end: 0

(1)@asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。
(2)yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。
(3)asyncio.sleep(1)相当于一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。

asyncio中get_event_loop()就是事件循环,而装饰器@asyncio.coroutine标记了一个协程,并yield from 语法实现协程切换。在Python3.5中,新增了asyncawait的新语法,代替装饰器和yield from。上例可以用新增语法完全代替。

async def say_hi(n):
    print("start:", n)
    r = await asyncio.sleep(2)
    print("end:", n)

loop = asyncio.get_event_loop()
tasks = [say_hi(0), say_hi(1)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# start: 1
# start: 0
# 停顿两秒
# end: 1
# end: 0

@asyncio.coroutine换成async, 将yield from 换成await ?即可。

5 协程的缺点

(1)使用协程,只能使用单线程,多线程的便利就一点都用不到。例如,I/O阻塞程序,CPU仍然会将整个任务挂起直到操作完成。
(2) 一旦使用协程,大部分ython库并不能很好的兼容,这就会导致要改写大量的标准库函数。
所以,最好别用协程,一旦用不好,协程给程序性能带来的提升,远远弥补不了其带来的灾难。

  

 

?

  

 

?

当前文章:http://www.cryphone.com/bw37/55929-320196-97746.html

发布时间:01:56:31

金多宝最快开奖结果??本港开奖直播现场??今晚开什么码结果??金光佛论坛开奖结果等??红姐高手论坛??特马天机??香港最快开奖结果直播??开奖直播??官家婆水心论坛四不像??天机神算刘伯温??

相关新闻

最后更新

热门新闻