获取行情数据并发消息到kafka

In [1]:
import easyquotation
import pandas as pd
import numpy as np
import time
from confluent_kafka import Producer,Consumer


quanta_list = []
quotation_engine = easyquotation.use('sina')
def foo():
    while True:
        q1 = quotation_engine.all
        df = pd.DataFrame(q1).T
        p.produce('test-quant',df.to_msgpack())
        time.sleep(10)
In [2]:
q1 = quotation_engine.all
df = pd.DataFrame(q1).T

定义数据流

In [3]:
from streamz.dataframe import DataFrame
from streamz import Stream
conf = {'bootstrap.servers': 'localhost:9092',
        'message.max.bytes': 524288000,
        'group.id': 'hi', 'session.timeout.ms': 6000,
#         'on_commit': lambda x:x,
#         'on_assign':lambda x:x,
        'default.topic.config': {'auto.offset.reset': 'smallest'},#smallest,earliest
       }

def isdf(x):
    return type(x)==pd.core.frame.DataFrame

source = Stream.from_kafka(['test-quant'],consumer_params=conf,start=True)
quant = source.map(pd.read_msgpack).filter(isdf)
quant.map(lambda df:df.head()).map(lambda df:df[['now','open']]).sink(display)

定义流算法

In [4]:
def to_my_df(df):
    df['code']=df.index
    df = df.reset_index()
    df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
    df['p_change']=(df.buy-df.now)/df.now
    return df

def getmycode(df):
    return df[df.code.str.startswith('150')]

def getmydf(df):
    df = df.query('ask1_volume>100')
    df.now = df.now.astype(float)
    #随便写的,没有意义
    df['jiange'] = df.now-df.now.shift(1)
    return df[['code','name','jiange']]

def mygroup(df):
    return df.pivot_table(index=['code','name'],values=['jiange'],aggfunc=np.average).sort_values('jiange').tail(10)

quant.map(to_my_df).map(getmydf).map(getmycode).sliding_window(1).map(pd.concat).map(mygroup).sink(display)

启动行情数据查看结果

In [5]:
p = Producer( {'bootstrap.servers': 'localhost:9092','message.max.bytes': 5242880})
p.produce('test-quant',df.to_msgpack())

流计算过程的可视化

In [6]:
source.visualize()
now open
000001 10.47 10.49
000002 25.36 24.81
000003 0 0
000004 16.78 16.7
000005 3.01 3
jiange
code name
150130 "医药A 0.508
150344 "证券B基 0.533
150172 "证券B 0.549
150265 "一带A 0.552
150241 "银行A级 0.553
150209 "国企改A 0.566
150198 "食品A 0.572
150051 "沪深300A 0.583
150221 "中航军A 0.585
150028 "中证500A 0.821
Out[6]: