设计概要:

  • 把数据流形象话的比作水流
  • 使用redis流和流的存储功能做水库,分别设计进水和出水系统
  • 使用tornado可以同时支持多个进出水水管并行运行,互不干扰
  • 使用streamz库灵活实现加在进出水管上的算法,可以实现限速rate_limit、过滤filter、批处理map,合并zip,缓冲buffer等特性

使用类库

使用了tornado的异步和streamz的流处理两个库,需要redis 5.0以上版本

In [1]:
import os
import weakref

import time
import tornado.ioloop
from tornado import gen

from streamz.core import Stream, convert_interval
import pandas as pd

class Source(Stream):
    _graphviz_shape = 'doubleoctagon'
    

进水口设计

异步定时循环任务,不听生成数据,并将数据push到redis

In [2]:
@Stream.register_api(staticmethod)
class engine(Source):
  
    def __init__(self, topic, 
                 push_interval=1, 
                 start=False,
                 func=lambda:{'time':time.time()},
                 asyncflag=False,
                 threadcount=5,
                 **kwargs):
        
        self.producer = None
        self.topic = topic
        self.push_interval = push_interval
        self.func=func
        self.asyncflag = asyncflag
        if self.asyncflag:
            from concurrent.futures import ThreadPoolExecutor
            self.thread_pool = ThreadPoolExecutor(threadcount)
            
        super(engine, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def do_push(self):
        if self.producer is not None:
            msg = self.producer.add(self.func())
            if msg :
                return msg

    @gen.coroutine
    def push_redis(self):
        while True:
            if self.asyncflag:
                val = self.thread_pool.submit(self.do_push)
            else:
                val = self.do_push()
            yield gen.sleep(self.push_interval)
            if self.stopped:
                break

    def start(self):
        from walrus import Database
        import distributed
        
        if self.stopped:
            self.finalize = distributed.compatibility.finalize
            self.db = Database()
            self.producer = self.db.Stream(self.topic)
            self.stopped = False
            self.loop.add_callback(self.push_redis)
           

    def stop(self):
        if self.producer is not None:
            self.producer = None
            self.stopped = True
        self.finalize(self, self.stop, weakref.ref(self))

出水口设计

从redis读取流数据生成stream对象

In [3]:
@Stream.register_api(staticmethod)
class from_redis(Source):
  
    def __init__(self, topics, poll_interval=0.1, start=False,group='test',
                 **kwargs):
        self.consumer = None
        self.topics = topics
        self.group=group
        self.poll_interval = poll_interval
        super(from_redis, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def do_poll(self):
        if self.consumer is not None:
            msg = self.consumer.read()
            if msg:
                return msg

    @gen.coroutine
    def poll_redis(self):
        while True:
            val = self.do_poll()
            if val:
                yield self._emit(val)
            else:
                yield gen.sleep(self.poll_interval)
            if self.stopped:
                break

    def start(self):
        import confluent_kafka as ck
        from walrus import Database
        import distributed
        
        if self.stopped:
            self.finalize = distributed.compatibility.finalize
            self.db = Database()
            self.consumer = self.db.consumer_group(self.group, self.topics)
            self.consumer.create()  # Create the consumer group.
#             self.consumer.set_id('$')#不会从头读
            self.stopped = False
            self.loop.add_callback(self.poll_redis)
           

    def stop(self):
        if self.consumer is not None:
            self.consumer.destroy() 
            self.consumer = None
            self.stopped = True
        self.finalize(self, self.stop, weakref.ref(self))

出水管算法

In [4]:
def parse(meta_msg):
    topic,msg = meta_msg[0],meta_msg[1][0]
    msg_id,msg_body = msg
    return msg_body

def ismykey(byte_dict):
    return byte_dict.get(b'quant_df') != None

def isotherkey(byte_dict):
    return byte_dict.get(b'quant_df') == None
    
def get_quant(byte_dict):
    return byte_dict.get(b'quant_df')

def isdf(msg):
    return type(msg)==pd.core.frame.DataFrame

def to_my_df(df):
    df['code']=df.index
    df = df.reset_index()
    df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
    df['p_change']=(df.buy-df.now)/df.now
    return df

def getmydf(df):
    df = df.query('ask1_volume>100')
    df.now = df.now.astype(float)
    #随便写的,没有意义
    df['jiange'] = df.now-df.now.shift(1)
    return df[['code','name','jiange']]

def get_index(df):
    return df[df.code.str.startswith('150')]

def my_sort(df):
    return df.sort_values('jiange').tail(10)
In [21]:
sdf.sliding_window(3).map(pd.concat)
In [5]:
# source.stop()
source = Stream.from_redis(['stream-a'],group='new16', start=False)
mysource = source.flatten().map(parse)

from fn import F
otherkey = mysource.filter(isotherkey)
l = otherkey.sink_to_list()
squant = mysource.filter(ismykey).map(get_quant).map(pd.read_msgpack).filter(isdf)
sdf = squant.map(to_my_df).map(getmydf).map(get_index).map(lambda df:df.sample(15)).map(my_sort)
sdf
In [6]:
source.start()
In [22]:
source.stop()
In [7]:
source.visualize()
Out[7]:

水泵设计

生成数据到原函数

In [8]:
def gen_quant():
    import easyquotation
    quotation_engine = easyquotation.use('sina')
    q1 = quotation_engine.all
    df = pd.DataFrame(q1).T
    return {'quant_df':df.to_msgpack()}

def gen_test():
    import moment as mm
    return {'test':mm.now().seconds}


def gen_block_test():
    import moment as mm
    import time
    time.sleep(6)
    return {'block_test':mm.now().seconds}
In [10]:
engine2 = engine(topic='stream-a',func=gen_test,push_interval=1)
engine2.start()
In [24]:
engine2.stop()
In [9]:
engine1 = Stream.engine(topic='stream-a',func=gen_quant,push_interval=5,asyncflag=True)
engine1.start()
In [23]:
engine1.stop()
In [14]:
engine3 = Stream.engine(topic='stream-a',func=gen_block_test,push_interval=1,asyncflag=True,threadcount=10)
engine3.start()
In [25]:
engine3.stop()
In [ ]:
engine3.stop()