Date Tags stream

定义好某个数据流后,可以按照路由规则,将数据流中的特定数据分流给特定函数去处理,这时的数据流中的数据,类似于事件系统中的事件的概念,而路由处理函数,相当于是事件处理函数.

单进程内的事件处理

  1. 我们使用engine生成一个默认流,流中的数据是当前事件的秒,类型为整数
  2. 我们路由所有的数据类型为int的数据到test函数处理
In [1]:
from deva import *
e = Stream.engine()
e.start()

@e.route(lambda x:type(x)==int)
def test(x):
    (x,x*2)>>stdout
(54, 108)(55, 110)(56, 112)(57, 114)
In [2]:
e.stop()
In [3]:
e.handlers
Out[3]:
[(<function __main__.<lambda>(x)>, <function __main__.test(x)>)]

跨进程内的事件处理

  1. 后台需要启动redis做跨进程中介
  2. 我们使用PBus这个跨进程的bus来做跨进程的支持
  3. 我们路由所有的数据类型为int或者str的数据到foo函数处理
In [4]:
from deva.process import PBus
In [5]:
pbus = PBus()

@pbus.route(lambda x:type(x) in [str,int])
def foo(x):
    print(x*2)
In [6]:
'foo'>>pbus
Out[6]:
'foo'
foofoo
In [7]:
123>>pbus
Out[7]:
123
246

PBus是可以跨进程的,在其他进程中使用它的实例,采用类似的写法即可