以下为个人学习笔记整理
# yield 的简单使用
def Fuck(): | |
for k, v in {"1": 1, "2": 2, "3": 3, "4": 4}.items(): | |
yield k,v | |
for k,v in Fuck(): | |
print(k , v) |
# 简单异步协程(同步情况下会报错)
# -*- coding:utf-8 -*- | |
import time | |
import threading | |
g_func = [] | |
def asyncfunc(multiparams=False): | |
def decorator(func): | |
def wrappedfunc(*args, **kwargs): | |
generator = func(*args, **kwargs) | |
if not multiparams: | |
def callback(result): | |
try: | |
generator.send(result) | |
except StopIteration: | |
pass | |
else: | |
def callback(*args, **kwargs): | |
try: | |
generator.send(*args, **kwargs) | |
except StopIteration: | |
pass | |
generator.send(None) | |
try: | |
generator.send(callback) | |
except StopIteration: | |
pass | |
return wrappedfunc | |
return decorator | |
@asyncfunc(multiparams=False) | |
def example(): | |
obj = Obj() | |
cb = yield | |
obj = yield obj.Create(cb) # 异步创建 | |
obj = yield obj.Load(cb) # 异步加载 | |
print("obj.m_Load :%s , obj.m_Create: %s"%(obj.m_Load,obj.m_Create)) | |
class Obj: | |
def Load(self, cb): | |
print("Load") | |
self.m_Load = True | |
global g_func | |
g_func.append((time.time()+1, cb, self)) | |
def Create(self, cb): | |
print("Create") | |
self.m_Create = True | |
global g_func | |
g_func.append((time.time()+1, cb, self)) | |
def Timer(): | |
global g_func | |
while True: | |
for func in g_func: | |
if time.time()>func[0]: | |
func[1](func[2]) | |
g_func.remove(func) | |
print("Timer") | |
time.sleep(1) | |
example() | |
t1 = threading.Thread(target=Timer) | |
t1.start() | |
result:-------------------------- | |
Create | |
Timer | |
Load | |
Timer | |
obj.m_Load :True , obj.m_Create: True | |
Timer | |
Timer |
# tornado 协程 (coroutine) 原理
https://blog.csdn.net/wyx819/article/details/45420017
# -*- coding:utf-8 -*- | |
import logging | |
import functools | |
import sys | |
import types | |
class Return(Exception): | |
""" | |
用于存储返回结果的自定义异常 | |
""" | |
def __init__(self, value=None): | |
super(Return, self).__init__() | |
self.value = value |
class Future(object): | |
""" | |
用于保存异步调用后的结果及回调函数对象 | |
""" | |
def __init__(self): | |
super(Future, self).__init__() | |
self._callbacks = [] | |
self._done = False | |
self._result = None | |
self._exec_info = None | |
def done(self): | |
return self._done | |
def result(self, timeout=None): | |
if self._result is not None: | |
return self._result | |
assert self._done | |
return self._result | |
def add_done_callback(self, fn): | |
if self._done: | |
fn(self) | |
else: | |
self._callbacks.append(fn) | |
def _set_done(self): | |
self._done = True | |
for cd in self._callbacks: | |
try: | |
cd(self) | |
except Exception as e: | |
logging.exception(e) | |
self._callbacks = None | |
def set_result(self, result): | |
self._result = result | |
self._set_done() | |
def set_exec_info(self, exec_info): | |
self._set_done() | |
self._exec_info = exec_info | |
def is_future(obj): | |
return isinstance(obj, Future) |
class IOLoop(object): | |
def __init__(self, *args, **kwargs): | |
super(IOLoop, self).__init__() | |
self.running = False | |
@classmethod | |
def current(cls): | |
return g_IOLoop | |
def add_future(self, future, callback): | |
""" | |
把回调函数加入future对象内 | |
:param future: | |
:param callback: | |
:return: | |
""" | |
assert isinstance(future, Future) | |
future.add_done_callback( | |
lambda future: self.add_callback(callback, future) | |
) | |
def add_callback(self, callback, *args, **kwargs): | |
func = functools.partial(callback, *args, **kwargs) | |
self._run_callback(func) | |
def _run_callback(self, callback): | |
ret = callback() | |
if ret is not None: | |
try: | |
ret = convert_yielded(ret) | |
except BadYieldError: | |
pass | |
else: | |
self.add_future(ret, lambda fn: fn.result()) | |
def coroutine(func): | |
""" | |
functools.wraps用来保留原函数信息,避免被迭代器取代 | |
:param func: | |
:return: | |
""" | |
@functools.wraps(func) | |
def wrapper(*args, **kwargs): | |
future = Future() | |
try: | |
result = func(*args, **kwargs) | |
except (StopIteration, Return) as e: | |
result = e.value | |
except Exception as e: | |
logging.exception(e) | |
future.set_exec_info(sys.exc_info()) | |
return future | |
else: | |
if isinstance(result, types.GeneratorType): | |
try: | |
generator = result | |
yielded = next(generator) | |
except (StopIteration, Return) as e: | |
future.set_result(getattr(e, "value", None)) | |
except Exception as e: | |
logging.exception(e) | |
future.set_exec_info(sys.exc_info()) | |
else: | |
Runner(generator, future, yielded) | |
try: | |
return future | |
finally: | |
""" | |
此处为了解除环引用问题 | |
Exception 中会存在 future 引用 | |
future._exec_info 也会有 Exception 的引用 | |
""" | |
future = None | |
future.set_result(result) | |
return future | |
return wrapper |
class Runner(object): | |
""" | |
执行send函数来对生成器进行迭代 | |
""" | |
def __init__(self, generator, result_future, first_yielded): | |
""" | |
创建保存生成器现状的对象 | |
:param generator: 生成器本身 | |
:param result_future: 含有执行结果的 future | |
:param first_yielded: 生成器的第一个 yield 后的值 | |
""" | |
self.generator = generator | |
self.result_future = result_future | |
self.future = g_null_future | |
self.running = False | |
self.finish = False | |
self.had_exception = False | |
self.io_loop = IOLoop.current() | |
if self.handle_yielded(first_yielded): | |
self.run() | |
def run(self): | |
if self.running or self.finish: | |
return | |
try: | |
self.running = True | |
while True: | |
future = self.future | |
""" | |
如果 future 里面的回调还没执行,先不send 避免出现 yield 还没阻塞就 send 引发的异常 | |
如果 future 已经执行了回调,里面必然有 result | |
如果 future 本身不是Future类型,抛出错误 | |
""" | |
if not future.done(): | |
return | |
self.future = None | |
try: | |
""" | |
1、正常执行:代表生成器还没有迭代完毕,进入下一轮迭代 得到 下一个 yield 右边值 正常情况下是 Future对象 | |
2、StopIteration or Return 异常:迭代结束 得到return 结果 或者得到 raise Retuen 对象里的 value | |
3、Exception 函数本身执行错误,直接抛出异常 | |
""" | |
try: | |
value = future.result() | |
except Exception as e: | |
logging.exception(e) | |
self.had_exception = True | |
yielded = self.generator.throw(*sys.exc_info()) | |
else: | |
yielded = self.generator.send(value) | |
except (StopIteration, Return) as e: | |
""" | |
生成器执行结束 | |
1、迭代终止抛出 StopIteration 异常,返回值return 会被保存在 StopIteration 的 value 属性里 | |
2、主动 raise Return 异常,raise Return(value) | |
把异常中的结果返回给 result_future | |
""" | |
self.finish = True | |
self.future = g_null_future | |
self.result_future.set_result(getattr(e, "value", None)) | |
self.result_future = None # 断开引用 | |
return | |
except Exception: | |
self.finish = True | |
self.future = g_null_future | |
self.result_future.set_result(sys.exc_info()) | |
self.result_future = None # 断开引用 | |
raise Exception | |
if not self.handle_yielded(yielded): | |
return | |
finally: | |
self.running = False | |
def handle_yielded(self, yielded): | |
""" | |
如果生成器存在多个 yield 则每次 send 后 得到下一个 yielded 用来执行下一次 send 或者 raise Return 去返回结果 | |
:param yielded: 下一个yield 的 右侧结果 | |
:return: | |
""" | |
try: | |
self.future = convert_yielded(yielded) | |
except BadYieldError: | |
self.future = Future() | |
self.future.set_result(sys.exc_info()) | |
if not self.future.done(): | |
self.io_loop.add_future( | |
self.future, lambda fn: self.run() | |
) | |
return False | |
return True | |
def convert_yielded(yielded): | |
if isinstance(yielded, Future): | |
return yielded | |
raise BadYieldError("yielded type error %r" % (yielded,)) | |
class BadYieldError(Exception): | |
pass | |
if not "g_IOLoop" in globals(): | |
g_IOLoop = IOLoop() | |
if not "g_null_future" in globals(): | |
""" | |
用来在特殊情况下推出函数的Future | |
""" | |
g_null_future = Future() | |
g_null_future.set_result(None) |
################### test | |
async_future2 = Future() | |
async_future4 = Future() | |
@coroutine | |
def Test(): | |
""" | |
支持多函数嵌套调用 | |
异步消息回调顺序可以不同 | |
:return: | |
""" | |
ret2 = yield Test2() | |
print(1) | |
ret4 = yield Test4() | |
print(2) | |
print("异步/同步调用返回结果 %r,%r" % (ret2,ret4)) | |
@coroutine | |
def Test2(): | |
""" | |
py3 return ret 和 raise Return(ret) 都可,py2 只能 raise Return(ret) | |
:return: | |
""" | |
ret = yield Test3() #future | |
# raise Return(ret) | |
return ret | |
def Test3(): | |
""" | |
如果没有 yield 返回值必须是 Future 类型 | |
:return: | |
""" | |
future = async_future2 | |
return future | |
@coroutine | |
def Test4(): | |
ret = yield Test5() # future(4) | |
return ret | |
def Test5(): | |
future = async_future4 | |
return future | |
Test() | |
# 该函数可以在本地同帧下调用也可在远程调用后回调 | |
async_future4.set_result(4) | |
async_future2.set_result(2) |