Date Tags stream
  • 我们经常在python里遇到async定义的函数,或者tornado中的coroutin,等一些异步函数或者对象,非常难于理解和操作。
  • 我们实现了attend这个函数,可以对这些异步任务很好的进行安排,当异步执行好后,会将结果放入到attend指定的流中。
  • 我们只需要在流上提前定义函数安排处理逻辑就好了。
  • 我们有时候服务器需要远程接受一个python对象进行处理,而不是接受json这种字符串类型的东西,我们这里实现py对象远程接收和使用
In [1]:
from deva import *
  1. from_http_request 将会在7777端口开启一个http stream server,接收客户端发来的py对象
  2. 这个server会把收到的py对象执行一个lambda函数 *2,并且打印日志
In [2]:
server=from_http_request(port=7777)
server.map(lambda x:x*2)>>log
server.start()
  1. 我们在客户端发送字符串“你好”到服务端,这个客户端默认时异步的发(可以支持很量并发),可以看到执行后出狱pending状态,服务端返回结果时看不到的
In [5]:
'你好'>>post_to('http://127.0.0.1:7777')
Out[5]:
<Future pending cb=[coroutine.<locals>.wrapper.<locals>.<lambda>() at /usr/local/lib/python3.8/site-packages/tornado/gen.py:251]>
[21:29:04] 你好你好                                                    bus.py:31
  1. 我们这次发证书2到服务端,这个发送到异步执行的结果会被attend到一个流,默认attend到log流。这里可以看到服务端收到2后*2输出4,而异步任务执行后的结果也被发到log流打印出来了
In [7]:
2>>post_to('http://127.0.0.1:7777') | attend(log)
[21:29:48] 4                                                           bus.py:31
[21:29:48] HTTPResponse(_body=None,_error_is_response_code=False,buffe bus.py:31
           r=<_io.BytesIO object at 0x10d23e310>,code=200,effective_ur          
           l='http://127.0.0.1:7777',error=None,headers=<tornado.httpu          
           til.HTTPHeaders object at 0x10f0203a0>,reason='OK',request=          
           <tornado.httpclient.HTTPRequest object at 0x1218663d0>,requ          
           est_time=0.027182817459106445,start_time=1606570188.591778,          
           time_info={})                                                        
In [ ]:
# 查看所有流的可视化
# Stream.instances()|pfilter(lambda x:)|pmap(lambda x:x.visualize()|display@P)|ls
In [16]:
# 查找所有Stream类的对象实例,可以看到我们默认log和warn都被导入了的
get_instances_by_class(Stream)|head(50)
Out[16]:
[<warn; Stream>,
 <sink: warning>,
 <log; Stream: 10>,
 <sink: log_print>,
 <bus; Topic>,
 <debug; Stream>,
 <sink: <lambda>>,
 <filter: <lambda>>,
 <执行代码; Stream: 200>,
 <Stream: 200>,
 <map: func=exec_command, cache_max_len=200>,
 <sink: emit>,
 <map: <lambda>>,
 <from_http_request: port=7777, path=/.*>,
 <sink: emit>,
 <map: <lambda>>]