以下为个人学习笔记整理
# yield 的简单使用
def Fuck():  | |
for k, v in {"1": 1, "2": 2, "3": 3, "4": 4}.items():  | |
yield k,v  | |
for k,v in Fuck():  | |
print(k , v)  | 
# 简单异步协程(同步情况下会报错)
# -*- coding:utf-8 -*- | |
import time | |
import threading | |
g_func = []  | |
def asyncfunc(multiparams=False):  | |
def decorator(func):  | |
def wrappedfunc(*args, **kwargs):  | |
generator = func(*args, **kwargs)  | |
if not multiparams:  | |
def callback(result):  | |
try:  | |
generator.send(result)  | |
except StopIteration:  | |
                        pass | |
else:  | |
def callback(*args, **kwargs):  | |
try:  | |
generator.send(*args, **kwargs)  | |
except StopIteration:  | |
                        pass | |
generator.send(None)  | |
try:  | |
generator.send(callback)  | |
except StopIteration:  | |
                pass | |
        return wrappedfunc | |
    return decorator | |
@asyncfunc(multiparams=False)  | |
def example():  | |
obj = Obj()  | |
cb = yield  | |
obj = yield obj.Create(cb) # 异步创建  | |
obj = yield obj.Load(cb) # 异步加载  | |
print("obj.m_Load :%s , obj.m_Create: %s"%(obj.m_Load,obj.m_Create))  | |
class Obj:  | |
def Load(self, cb):  | |
print("Load")  | |
self.m_Load = True  | |
        global g_func | |
g_func.append((time.time()+1, cb, self))  | |
def Create(self, cb):  | |
print("Create")  | |
self.m_Create = True  | |
        global g_func | |
g_func.append((time.time()+1, cb, self))  | |
def Timer():  | |
    global g_func | |
while True:  | |
for func in g_func:  | |
if time.time()>func[0]:  | |
func[1](func[2])  | |
g_func.remove(func)  | |
print("Timer")  | |
time.sleep(1)  | |
example()  | |
t1 = threading.Thread(target=Timer)  | |
t1.start()  | |
result:--------------------------  | |
Create  | |
Timer  | |
Load  | |
Timer  | |
obj.m_Load :True , obj.m_Create: True  | |
Timer  | |
Timer  | 
# tornado 协程 (coroutine) 原理
https://blog.csdn.net/wyx819/article/details/45420017
# -*- coding:utf-8 -*- | |
import logging | |
import functools | |
import sys | |
import types | |
class Return(Exception):  | |
    """ | |
用于存储返回结果的自定义异常  | |
"""  | |
def __init__(self, value=None):  | |
super(Return, self).__init__()  | |
self.value = value  | 
class Future(object):  | |
    """ | |
用于保存异步调用后的结果及回调函数对象  | |
"""  | |
def __init__(self):  | |
super(Future, self).__init__()  | |
self._callbacks = []  | |
self._done = False  | |
self._result = None  | |
self._exec_info = None  | |
def done(self):  | |
return self._done  | |
def result(self, timeout=None):  | |
if self._result is not None:  | |
return self._result  | |
assert self._done  | |
return self._result  | |
def add_done_callback(self, fn):  | |
if self._done:  | |
fn(self)  | |
else:  | |
self._callbacks.append(fn)  | |
def _set_done(self):  | |
self._done = True  | |
for cd in self._callbacks:  | |
try:  | |
cd(self)  | |
except Exception as e:  | |
logging.exception(e)  | |
self._callbacks = None  | |
def set_result(self, result):  | |
self._result = result  | |
self._set_done()  | |
def set_exec_info(self, exec_info):  | |
self._set_done()  | |
self._exec_info = exec_info  | |
def is_future(obj):  | |
return isinstance(obj, Future)  | 
class IOLoop(object):  | |
def __init__(self, *args, **kwargs):  | |
super(IOLoop, self).__init__()  | |
self.running = False  | |
    @classmethod | |
def current(cls):  | |
        return g_IOLoop | |
def add_future(self, future, callback):  | |
        """ | |
把回调函数加入future对象内  | |
:param future:  | |
:param callback:  | |
:return:  | |
"""  | |
assert isinstance(future, Future)  | |
future.add_done_callback(  | |
lambda future: self.add_callback(callback, future)  | |
        ) | |
def add_callback(self, callback, *args, **kwargs):  | |
func = functools.partial(callback, *args, **kwargs)  | |
self._run_callback(func)  | |
def _run_callback(self, callback):  | |
ret = callback()  | |
if ret is not None:  | |
try:  | |
ret = convert_yielded(ret)  | |
except BadYieldError:  | |
                pass | |
else:  | |
self.add_future(ret, lambda fn: fn.result())  | |
def coroutine(func):  | |
    """ | |
functools.wraps用来保留原函数信息,避免被迭代器取代  | |
:param func:  | |
:return:  | |
"""  | |
@functools.wraps(func)  | |
def wrapper(*args, **kwargs):  | |
future = Future()  | |
try:  | |
result = func(*args, **kwargs)  | |
except (StopIteration, Return) as e:  | |
result = e.value  | |
except Exception as e:  | |
logging.exception(e)  | |
future.set_exec_info(sys.exc_info())  | |
            return future | |
else:  | |
if isinstance(result, types.GeneratorType):  | |
try:  | |
                    generator = result | |
yielded = next(generator)  | |
except (StopIteration, Return) as e:  | |
future.set_result(getattr(e, "value", None))  | |
except Exception as e:  | |
logging.exception(e)  | |
future.set_exec_info(sys.exc_info())  | |
else:  | |
Runner(generator, future, yielded)  | |
try:  | |
                    return future | |
finally:  | |
                    """ | |
此处为了解除环引用问题  | |
Exception 中会存在 future 引用  | |
future._exec_info 也会有 Exception 的引用  | |
"""  | |
future = None  | |
future.set_result(result)  | |
        return future | |
    return wrapper | 
class Runner(object):  | |
    """ | |
执行send函数来对生成器进行迭代  | |
"""  | |
def __init__(self, generator, result_future, first_yielded):  | |
        """ | |
创建保存生成器现状的对象  | |
:param generator: 生成器本身  | |
:param result_future: 含有执行结果的 future  | |
:param first_yielded: 生成器的第一个 yield 后的值  | |
"""  | |
self.generator = generator  | |
self.result_future = result_future  | |
self.future = g_null_future  | |
self.running = False  | |
self.finish = False  | |
self.had_exception = False  | |
self.io_loop = IOLoop.current()  | |
if self.handle_yielded(first_yielded):  | |
self.run()  | |
def run(self):  | |
if self.running or self.finish:  | |
            return | |
try:  | |
self.running = True  | |
while True:  | |
future = self.future  | |
                """ | |
如果 future 里面的回调还没执行,先不send 避免出现 yield 还没阻塞就 send 引发的异常  | |
如果 future 已经执行了回调,里面必然有 result  | |
如果 future 本身不是Future类型,抛出错误  | |
"""  | |
if not future.done():  | |
                    return | |
self.future = None  | |
try:  | |
                    """ | |
1、正常执行:代表生成器还没有迭代完毕,进入下一轮迭代 得到 下一个 yield 右边值 正常情况下是 Future对象  | |
2、StopIteration or Return 异常:迭代结束 得到return 结果 或者得到 raise Retuen 对象里的 value  | |
3、Exception 函数本身执行错误,直接抛出异常  | |
"""  | |
try:  | |
value = future.result()  | |
except Exception as e:  | |
logging.exception(e)  | |
self.had_exception = True  | |
yielded = self.generator.throw(*sys.exc_info())  | |
else:  | |
yielded = self.generator.send(value)  | |
except (StopIteration, Return) as e:  | |
                    """ | |
生成器执行结束  | |
1、迭代终止抛出 StopIteration 异常,返回值return 会被保存在 StopIteration 的 value 属性里  | |
2、主动 raise Return 异常,raise Return(value)  | |
把异常中的结果返回给 result_future  | |
"""  | |
self.finish = True  | |
self.future = g_null_future  | |
self.result_future.set_result(getattr(e, "value", None))  | |
self.result_future = None # 断开引用  | |
                    return | |
except Exception:  | |
self.finish = True  | |
self.future = g_null_future  | |
self.result_future.set_result(sys.exc_info())  | |
self.result_future = None # 断开引用  | |
                    raise Exception | |
if not self.handle_yielded(yielded):  | |
                    return | |
finally:  | |
self.running = False  | |
def handle_yielded(self, yielded):  | |
        """ | |
如果生成器存在多个 yield 则每次 send 后 得到下一个 yielded 用来执行下一次 send 或者 raise Return 去返回结果  | |
:param yielded: 下一个yield 的 右侧结果  | |
:return:  | |
"""  | |
try:  | |
self.future = convert_yielded(yielded)  | |
except BadYieldError:  | |
self.future = Future()  | |
self.future.set_result(sys.exc_info())  | |
if not self.future.done():  | |
self.io_loop.add_future(  | |
self.future, lambda fn: self.run()  | |
            ) | |
return False  | |
return True  | |
def convert_yielded(yielded):  | |
if isinstance(yielded, Future):  | |
        return yielded | |
raise BadYieldError("yielded type error %r" % (yielded,))  | |
class BadYieldError(Exception):  | |
    pass | |
if not "g_IOLoop" in globals():  | |
g_IOLoop = IOLoop()  | |
if not "g_null_future" in globals():  | |
    """ | |
用来在特殊情况下推出函数的Future  | |
"""  | |
g_null_future = Future()  | |
g_null_future.set_result(None)  | 
################### test | |
async_future2 = Future()  | |
async_future4 = Future()  | |
@coroutine | |
def Test():  | |
    """ | |
支持多函数嵌套调用  | |
异步消息回调顺序可以不同  | |
:return:  | |
"""  | |
ret2 = yield Test2()  | |
print(1)  | |
ret4 = yield Test4()  | |
print(2)  | |
print("异步/同步调用返回结果 %r,%r" % (ret2,ret4))  | |
@coroutine | |
def Test2():  | |
    """ | |
py3 return ret 和 raise Return(ret) 都可,py2 只能 raise Return(ret)  | |
:return:  | |
"""  | |
ret = yield Test3() #future  | |
    # raise Return(ret) | |
    return ret | |
def Test3():  | |
    """ | |
如果没有 yield 返回值必须是 Future 类型  | |
:return:  | |
"""  | |
    future = async_future2 | |
    return future | |
@coroutine | |
def Test4():  | |
ret = yield Test5() # future(4)  | |
    return ret | |
def Test5():  | |
    future = async_future4 | |
    return future | |
Test()  | |
# 该函数可以在本地同帧下调用也可在远程调用后回调 | |
async_future4.set_result(4)  | |
async_future2.set_result(2)  |