Date Tags stock

一般使用流的业务,数据都会连绵不绝比较多,我们一般不会做存储,算完即走,但是有时需要使用流刚刚用过的的一些数据,这时就可以使用流的缓存机制

In [1]:
from deva import *
  1. cache_max_len,缓存数据的最大条数,超过这个限制的,老数据会被丢弃
  2. cache_max_age_seconds,缓存最大时间,超过这个时间点,老数据会被丢弃
In [2]:
e = engine(func=gen_test,cache_max_len=50,cache_max_age_seconds=60*60*24)
e.name='quantation'
e.start()

获取最近10条缓存数据

In [3]:
e.recent(10)
Out[3]:
[50, 51, 52, 53, 54, 55]

过上几秒再次执行

In [4]:
e.recent(10)
Out[4]:
[17, 18, 19, 20, 21, 22, 23, 24, 25, 26]

获取最近10秒的数据

In [5]:
e.recent(seconds=10)
Out[5]:
value
2019-03-21 22:21:22.143711 22
2019-03-21 22:21:23.146273 23
2019-03-21 22:21:24.150259 24
2019-03-21 22:21:25.155633 25
2019-03-21 22:21:26.159029 26
2019-03-21 22:21:27.163026 27
2019-03-21 22:21:28.165441 28
2019-03-21 22:21:29.170812 29
2019-03-21 22:21:30.172070 30
2019-03-21 22:21:31.175500 31

增加其他流

In [6]:
e2 = engine()
e2.name = 'e2'
In [7]:
e2.start()

使用流实例管理器

In [8]:
m = Stream.manager()
In [9]:
m.map(lambda x:x>>to_list)

上面的数据中,e2和quantation是实时刷新的

In [10]:
m.name
Out[10]:
'_stream_manager'