设计概要:
- 把数据流形象话的比作水流
- 使用redis流和流的存储功能做水库,分别设计进水和出水系统
- 使用tornado可以同时支持多个进出水水管并行运行,互不干扰
- 使用streamz库灵活实现加在进出水管上的算法,可以实现限速rate_limit、过滤filter、批处理map,合并zip,缓冲buffer等特性
使用类库¶
使用了tornado的异步和streamz的流处理两个库,需要redis 5.0以上版本
In [1]:
import os
import weakref
import time
import tornado.ioloop
from tornado import gen
from streamz.core import Stream, convert_interval
import pandas as pd
class Source(Stream):
_graphviz_shape = 'doubleoctagon'
进水口设计¶
异步定时循环任务,不听生成数据,并将数据push到redis
In [2]:
@Stream.register_api(staticmethod)
class engine(Source):
def __init__(self, topic,
push_interval=1,
start=False,
func=lambda:{'time':time.time()},
asyncflag=False,
threadcount=5,
**kwargs):
self.producer = None
self.topic = topic
self.push_interval = push_interval
self.func=func
self.asyncflag = asyncflag
if self.asyncflag:
from concurrent.futures import ThreadPoolExecutor
self.thread_pool = ThreadPoolExecutor(threadcount)
super(engine, self).__init__(ensure_io_loop=True, **kwargs)
self.stopped = True
if start:
self.start()
def do_push(self):
if self.producer is not None:
msg = self.producer.add(self.func())
if msg :
return msg
@gen.coroutine
def push_redis(self):
while True:
if self.asyncflag:
val = self.thread_pool.submit(self.do_push)
else:
val = self.do_push()
yield gen.sleep(self.push_interval)
if self.stopped:
break
def start(self):
from walrus import Database
import distributed
if self.stopped:
self.finalize = distributed.compatibility.finalize
self.db = Database()
self.producer = self.db.Stream(self.topic)
self.stopped = False
self.loop.add_callback(self.push_redis)
def stop(self):
if self.producer is not None:
self.producer = None
self.stopped = True
self.finalize(self, self.stop, weakref.ref(self))
出水口设计¶
从redis读取流数据生成stream对象
In [3]:
@Stream.register_api(staticmethod)
class from_redis(Source):
def __init__(self, topics, poll_interval=0.1, start=False,group='test',
**kwargs):
self.consumer = None
self.topics = topics
self.group=group
self.poll_interval = poll_interval
super(from_redis, self).__init__(ensure_io_loop=True, **kwargs)
self.stopped = True
if start:
self.start()
def do_poll(self):
if self.consumer is not None:
msg = self.consumer.read()
if msg:
return msg
@gen.coroutine
def poll_redis(self):
while True:
val = self.do_poll()
if val:
yield self._emit(val)
else:
yield gen.sleep(self.poll_interval)
if self.stopped:
break
def start(self):
import confluent_kafka as ck
from walrus import Database
import distributed
if self.stopped:
self.finalize = distributed.compatibility.finalize
self.db = Database()
self.consumer = self.db.consumer_group(self.group, self.topics)
self.consumer.create() # Create the consumer group.
# self.consumer.set_id('$')#不会从头读
self.stopped = False
self.loop.add_callback(self.poll_redis)
def stop(self):
if self.consumer is not None:
self.consumer.destroy()
self.consumer = None
self.stopped = True
self.finalize(self, self.stop, weakref.ref(self))
出水管算法¶
In [4]:
def parse(meta_msg):
topic,msg = meta_msg[0],meta_msg[1][0]
msg_id,msg_body = msg
return msg_body
def ismykey(byte_dict):
return byte_dict.get(b'quant_df') != None
def isotherkey(byte_dict):
return byte_dict.get(b'quant_df') == None
def get_quant(byte_dict):
return byte_dict.get(b'quant_df')
def isdf(msg):
return type(msg)==pd.core.frame.DataFrame
def to_my_df(df):
df['code']=df.index
df = df.reset_index()
df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
df['p_change']=(df.buy-df.now)/df.now
return df
def getmydf(df):
df = df.query('ask1_volume>100')
df.now = df.now.astype(float)
#随便写的,没有意义
df['jiange'] = df.now-df.now.shift(1)
return df[['code','name','jiange']]
def get_index(df):
return df[df.code.str.startswith('150')]
def my_sort(df):
return df.sort_values('jiange').tail(10)
In [21]:
sdf.sliding_window(3).map(pd.concat)
In [5]:
# source.stop()
source = Stream.from_redis(['stream-a'],group='new16', start=False)
mysource = source.flatten().map(parse)
from fn import F
otherkey = mysource.filter(isotherkey)
l = otherkey.sink_to_list()
squant = mysource.filter(ismykey).map(get_quant).map(pd.read_msgpack).filter(isdf)
sdf = squant.map(to_my_df).map(getmydf).map(get_index).map(lambda df:df.sample(15)).map(my_sort)
sdf
In [6]:
source.start()
In [22]:
source.stop()
In [7]:
source.visualize()
Out[7]:
水泵设计¶
生成数据到原函数
In [8]:
def gen_quant():
import easyquotation
quotation_engine = easyquotation.use('sina')
q1 = quotation_engine.all
df = pd.DataFrame(q1).T
return {'quant_df':df.to_msgpack()}
def gen_test():
import moment as mm
return {'test':mm.now().seconds}
def gen_block_test():
import moment as mm
import time
time.sleep(6)
return {'block_test':mm.now().seconds}
In [10]:
engine2 = engine(topic='stream-a',func=gen_test,push_interval=1)
engine2.start()
In [24]:
engine2.stop()
In [9]:
engine1 = Stream.engine(topic='stream-a',func=gen_quant,push_interval=5,asyncflag=True)
engine1.start()
In [23]:
engine1.stop()
In [14]:
engine3 = Stream.engine(topic='stream-a',func=gen_block_test,push_interval=1,asyncflag=True,threadcount=10)
engine3.start()
In [25]:
engine3.stop()
In [ ]:
engine3.stop()