获取行情数据并发消息到kafka¶
In [1]:
import easyquotation
import pandas as pd
import numpy as np
import time
from confluent_kafka import Producer,Consumer
quanta_list = []
quotation_engine = easyquotation.use('sina')
def foo():
while True:
q1 = quotation_engine.all
df = pd.DataFrame(q1).T
p.produce('test-quant',df.to_msgpack())
time.sleep(10)
In [2]:
q1 = quotation_engine.all
df = pd.DataFrame(q1).T
定义数据流¶
In [3]:
from streamz.dataframe import DataFrame
from streamz import Stream
conf = {'bootstrap.servers': 'localhost:9092',
'message.max.bytes': 524288000,
'group.id': 'hi', 'session.timeout.ms': 6000,
# 'on_commit': lambda x:x,
# 'on_assign':lambda x:x,
'default.topic.config': {'auto.offset.reset': 'smallest'},#smallest,earliest
}
def isdf(x):
return type(x)==pd.core.frame.DataFrame
source = Stream.from_kafka(['test-quant'],consumer_params=conf,start=True)
quant = source.map(pd.read_msgpack).filter(isdf)
quant.map(lambda df:df.head()).map(lambda df:df[['now','open']]).sink(display)
定义流算法¶
In [4]:
def to_my_df(df):
df['code']=df.index
df = df.reset_index()
df = df.query('now!=0').query('buy!=0').query('ask1_volume>100')
df['p_change']=(df.buy-df.now)/df.now
return df
def getmycode(df):
return df[df.code.str.startswith('150')]
def getmydf(df):
df = df.query('ask1_volume>100')
df.now = df.now.astype(float)
#随便写的,没有意义
df['jiange'] = df.now-df.now.shift(1)
return df[['code','name','jiange']]
def mygroup(df):
return df.pivot_table(index=['code','name'],values=['jiange'],aggfunc=np.average).sort_values('jiange').tail(10)
quant.map(to_my_df).map(getmydf).map(getmycode).sliding_window(1).map(pd.concat).map(mygroup).sink(display)
启动行情数据查看结果¶
In [5]:
p = Producer( {'bootstrap.servers': 'localhost:9092','message.max.bytes': 5242880})
p.produce('test-quant',df.to_msgpack())
流计算过程的可视化¶
In [6]:
source.visualize()
Out[6]: