from_coroutine 和from_future

  1. 上游异步任务交到流会自动被执行,并且将结果push到下游
  2. 上游异步任务必须是coroutine或者future,异步的异步的网络抓取函数或者数据库写入函数的执行结果
  3. 协程执行结果进入下游是无序的
In [34]:
from deva import *
In [33]:
@gen.coroutine
def t_foo():
    yield gen.sleep(0.1)
    return range<<10>>sample>>first
In [32]:
async def a_foo():
    import asyncio
    await asyncio.sleep(0.1)
    return range<<10>>sample>>first
In [35]:
s = Stream()
s.rate_limit(0.01).from_coroutine()>>log
#协程函数和参数需要一起发送
for i in range(3):
    (t_foo,)>>s
    (a_foo,)>>s
[2020-02-27 06:40:22.220265] INFO: deva.log: 0
[2020-02-27 06:40:22.329854] INFO: deva.log: 6
[2020-02-27 06:40:22.435644] INFO: deva.log: 9
[2020-02-27 06:40:22.542257] INFO: deva.log: 4
[2020-02-27 06:40:22.651008] INFO: deva.log: 6
[2020-02-27 06:40:22.756617] INFO: deva.log: 0
In [37]:
s = Stream()
s.rate_limit(0.01).from_future()>>log
#将函数执行结果future发送
for i in range(3):
    t_foo()>>s
    a_foo()>>s
[2020-02-27 06:40:53.131384] INFO: deva.log: 1
[2020-02-27 06:40:53.134402] INFO: deva.log: 1
[2020-02-27 06:40:53.137684] INFO: deva.log: 4
[2020-02-27 06:40:53.208862] INFO: deva.log: 0
[2020-02-27 06:40:53.210711] INFO: deva.log: 5
[2020-02-27 06:40:53.213403] INFO: deva.log: 7

jupyter中同步调用异步

  1. yield后,没有使用return的,将值主动push给某个对象或者流
  2. yield后。有return返回值的 ,在于设置callback获取future的返回值

asyncio编写方式

task=loop.asyncio_loop.create_task后,task.add_done_callback增加完成callback

In [23]:
import asyncio
loop = ioloop.IOLoop.current()
async def hasreturn():
    await asyncio.sleep(2)
    return 456

async def noreturn():
    await asyncio.sleep(2)
    123>>print
In [22]:
task=loop.asyncio_loop.create_task(noreturn())
123
In [24]:
task=loop.asyncio_loop.create_task(hasreturn())
task.add_done_callback(lambda x:x.result()>>print)
456

tornado编写方式

loop.add_future增加callback函数获取函数返回值

In [27]:
@gen.coroutine
def t_hasreturn():
    yield asyncio.sleep(2)
    return 456

@gen.coroutine
def t_noreturn():
    yield asyncio.sleep(2)
    123>>print
In [29]:
t_noreturn()
Out[29]:
<Future pending cb=[_make_coroutine_wrapper.<locals>.wrapper.<locals>.<lambda>() at /usr/local/Cellar/ipython/6.5.0/libexec/vendor/lib/python3.7/site-packages/tornado/gen.py:347]>
123
In [31]:
loop = ioloop.IOLoop.current()
loop.add_future(t_hasreturn(), lambda x:x.result()>>print)
456