以下为个人学习笔记整理

# yield 的简单使用

def Fuck():
    for k, v in {"1": 1, "2": 2, "3": 3, "4": 4}.items():
        yield k,v
for k,v in Fuck():
    print(k , v)

# 简单异步协程(同步情况下会报错)

# -*- coding:utf-8 -*-
import time
import threading
g_func = []
def asyncfunc(multiparams=False):
    def decorator(func):
        def wrappedfunc(*args, **kwargs):
            generator = func(*args, **kwargs)
            if not multiparams:
                def callback(result):
                    try:
                        generator.send(result)
                    except StopIteration:
                        pass
            else:
                def callback(*args, **kwargs):
                    try:
                        generator.send(*args, **kwargs)
                    except StopIteration:
                        pass
            generator.send(None)
            try:
                generator.send(callback)
            except StopIteration:
                pass
        return wrappedfunc
    return decorator
@asyncfunc(multiparams=False)
def example():
    obj = Obj()
    cb = yield
    obj = yield obj.Create(cb)    # 异步创建
    obj = yield obj.Load(cb)      # 异步加载
    print("obj.m_Load :%s , obj.m_Create: %s"%(obj.m_Load,obj.m_Create))
class Obj:
    def Load(self, cb):
        print("Load")
        self.m_Load = True
        global g_func
        g_func.append((time.time()+1, cb, self))
    def Create(self, cb):
        print("Create")
        self.m_Create = True
        global g_func
        g_func.append((time.time()+1, cb, self))
def Timer():
    global g_func
    while True:
        for func in g_func:
            if time.time()>func[0]:
                func[1](func[2])
                g_func.remove(func)
        print("Timer")
        time.sleep(1)
example()
t1 = threading.Thread(target=Timer)
t1.start()
result:--------------------------
Create
Timer
Load
Timer
obj.m_Load :True , obj.m_Create: True
Timer
Timer

# tornado 协程 (coroutine) 原理

https://blog.csdn.net/wyx819/article/details/45420017

# -*- coding:utf-8 -*-
import logging
import functools
import sys
import types
class Return(Exception):
    """
    用于存储返回结果的自定义异常
    """
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
class Future(object):
    """
    用于保存异步调用后的结果及回调函数对象
    """
    def __init__(self):
        super(Future, self).__init__()
        self._callbacks = []
        self._done = False
        self._result = None
        self._exec_info = None
    def done(self):
        return self._done
    def result(self, timeout=None):
        if self._result is not None:
            return self._result
        assert self._done
        return self._result
    def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)
    def _set_done(self):
        self._done = True
        for cd in self._callbacks:
            try:
                cd(self)
            except Exception as e:
                logging.exception(e)
        self._callbacks = None
    def set_result(self, result):
        self._result = result
        self._set_done()
    def set_exec_info(self, exec_info):
        self._set_done()
        self._exec_info = exec_info
def is_future(obj):
    return isinstance(obj, Future)
class IOLoop(object):
    def __init__(self, *args, **kwargs):
        super(IOLoop, self).__init__()
        self.running = False
    @classmethod
    def current(cls):
        return g_IOLoop
    def add_future(self, future, callback):
        """
        把回调函数加入future对象内
        :param future:
        :param callback:
        :return:
        """
        assert isinstance(future, Future)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future)
        )
    def add_callback(self, callback, *args, **kwargs):
        func = functools.partial(callback, *args, **kwargs)
        self._run_callback(func)
    def _run_callback(self, callback):
        ret = callback()
        if ret is not None:
            try:
                ret = convert_yielded(ret)
            except BadYieldError:
                pass
            else:
                self.add_future(ret, lambda fn: fn.result())
def coroutine(func):
    """
    functools.wraps用来保留原函数信息,避免被迭代器取代
    :param func:
    :return:
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = Future()
        try:
            result = func(*args, **kwargs)
        except (StopIteration, Return) as e:
            result = e.value
        except Exception as e:
            logging.exception(e)
            future.set_exec_info(sys.exc_info())
            return future
        else:
            if isinstance(result, types.GeneratorType):
                try:
                    generator = result
                    yielded = next(generator)
                except (StopIteration, Return) as e:
                    future.set_result(getattr(e, "value", None))
                except Exception as e:
                    logging.exception(e)
                    future.set_exec_info(sys.exc_info())
                else:
                    Runner(generator, future, yielded)
                try:
                    return future
                finally:
                    """
                    此处为了解除环引用问题
                    Exception 中会存在 future 引用
                    future._exec_info 也会有 Exception 的引用
                    """
                    future = None
        future.set_result(result)
        return future
    return wrapper
class Runner(object):
    """
    执行send函数来对生成器进行迭代
    """
    def __init__(self, generator, result_future, first_yielded):
        """
        创建保存生成器现状的对象
        :param generator: 生成器本身
        :param result_future: 含有执行结果的 future
        :param first_yielded: 生成器的第一个 yield 后的值
        """
        self.generator = generator
        self.result_future = result_future
        self.future = g_null_future
        self.running = False
        self.finish = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        if self.handle_yielded(first_yielded):
            self.run()
    def run(self):
        if self.running or self.finish:
            return
        try:
            self.running = True
            while True:
                future = self.future
                """
                如果 future 里面的回调还没执行,先不send 避免出现 yield 还没阻塞就 send 引发的异常
                如果 future 已经执行了回调,里面必然有 result
                如果 future 本身不是Future类型,抛出错误
                """
                if not future.done():
                    return
                self.future = None
                try:
                    """
                    1、正常执行:代表生成器还没有迭代完毕,进入下一轮迭代 得到 下一个 yield 右边值 正常情况下是 Future对象
                    2、StopIteration or Return 异常:迭代结束 得到return 结果 或者得到 raise Retuen 对象里的 value
                    3、Exception 函数本身执行错误,直接抛出异常
                    """
                    try:
                        value = future.result()
                    except Exception as e:
                        logging.exception(e)
                        self.had_exception = True
                        yielded = self.generator.throw(*sys.exc_info())
                    else:
                        yielded = self.generator.send(value)
                except (StopIteration, Return) as e:
                    """
                    生成器执行结束
                        1、迭代终止抛出 StopIteration 异常,返回值return 会被保存在 StopIteration 的 value 属性里
                        2、主动 raise Return 异常,raise Return(value)
                    把异常中的结果返回给 result_future
                    """
                    self.finish = True
                    self.future = g_null_future
                    self.result_future.set_result(getattr(e, "value", None))
                    self.result_future = None  # 断开引用
                    return
                except Exception:
                    self.finish = True
                    self.future = g_null_future
                    self.result_future.set_result(sys.exc_info())
                    self.result_future = None  # 断开引用
                    raise Exception
                if not self.handle_yielded(yielded):
                    return
        finally:
            self.running = False
    def handle_yielded(self, yielded):
        """
        如果生成器存在多个 yield 则每次 send 后 得到下一个 yielded 用来执行下一次 send 或者 raise Return 去返回结果
        :param yielded:  下一个yield 的 右侧结果
        :return:
        """
        try:
            self.future = convert_yielded(yielded)
        except BadYieldError:
            self.future = Future()
            self.future.set_result(sys.exc_info())
        if not self.future.done():
            self.io_loop.add_future(
                self.future, lambda fn: self.run()
            )
            return False
        return True
def convert_yielded(yielded):
    if isinstance(yielded, Future):
        return yielded
    raise BadYieldError("yielded type error %r" % (yielded,))
class BadYieldError(Exception):
    pass
if not "g_IOLoop" in globals():
    g_IOLoop = IOLoop()
if not "g_null_future" in globals():
    """
    用来在特殊情况下推出函数的Future
    """
    g_null_future = Future()
    g_null_future.set_result(None)
################### test
async_future2 = Future()
async_future4 = Future()
@coroutine
def Test():
    """
    支持多函数嵌套调用
    异步消息回调顺序可以不同
    :return:
    """
    ret2 = yield Test2()
    print(1)
    ret4 = yield Test4()
    print(2)
    print("异步/同步调用返回结果 %r,%r" % (ret2,ret4))
@coroutine
def Test2():
    """
    py3 return ret 和 raise Return(ret) 都可,py2 只能 raise Return(ret)
    :return:
    """
    ret = yield Test3() #future
    # raise Return(ret)
    return ret
def Test3():
    """
    如果没有 yield 返回值必须是 Future 类型
    :return:
    """
    future = async_future2
    return future
@coroutine
def Test4():
    ret = yield Test5() # future(4)
    return ret
def Test5():
    future = async_future4
    return future
Test()
# 该函数可以在本地同帧下调用也可在远程调用后回调
async_future4.set_result(4)
async_future2.set_result(2)