Streaming Data

“流数据”是连续生成的数据,通常由某些外部源(如远程网站,测量设备或模拟器)生成。这种数据在金融时间序列,Web服务器日志,科学应用程序和许多其他情况下很常见。我们已经了解了如何在[实时数据](06-Live _Data.ipynb)用户指南中显示可调用的任何数据输出,我们还看到了如何使用HoloViews流系统在用户指南中推送事件部分[响应事件](11-响应_到Events.ipynb)和[自定义交互](12-Custom Interactivity.ipynb)。

本用户指南显示了使用DynamicMap和流构建交互式绘图的第三种方法。在这里,不是将绘图元数据(例如缩放范围,用户触发的事件,如“Tap”等)推送到DynamicMap回调,而是使用HoloViews直接更新可视化元素中的基础数据。 `Stream``。

特别是,我们将展示如何使用HoloViews的PipeBuffer流来处理流数据源,而无需从DynamicMap可调用内部获取或生成数据。除了简单地从“DynamicMap”外部设置元素数据之外,我们还将探索使用可选单独[streamz]协调的流数据处理方法 (http://matthewrocklin.com/blog/work/来自MattRocklin的2017/10/16/streaming-dataframes-1) 库可以使构建复杂的流媒体管道变得更加简单。

请注意,此页面演示了需要实时运行的Python服务器的功能。当导出到pyviz.org网站上的静态HTML页面时,您将只看到一个图。当将此代码作为Jupyter笔记本运行时,您应该逐个单元地执行它以依次查看每个操作的效果。

In [ ]:
import time
import numpy as np
import pandas as pd
import holoviews as hv

from holoviews.streams import Pipe, Buffer

import streamz
import streamz.dataframe

hv.extension('bokeh')

Pipe

Pipe允许将数据推送到DynamicMap回调中以更改可视化,就像[响应事件](./ 11-Responding to Events.ipynb)用户指南中的流一样用于将更改推送到控制可视化的元数据。可以使用Pipe来推送任何类型的数据,并使其可用于DynamicMap回调。由于所有Element类型接受各种形式的data,我们可以使用Pipe通过DynamicMap将数据直接推送到Element的构造函数。

我们可以利用大多数元素可以在不提供任何数据的情况下实例化的事实,因此我们使用空列表声明Pipe并声明DynamicMap,将管道作为流提供,这将动态更新VectorField

In [ ]:
pipe = Pipe([])
vector_dmap = hv.DynamicMap(hv.VectorField, streams=[pipe])
vector_dmap.redim.range(x=(-1, 1), y=(-1, 1))

设置这个VectorField绑定到Pipe后,我们可以开始向它推送数据,改变VectorField的方向:

In [ ]:
x,y  = np.mgrid[-10:11,-10:11] * 0.1
sine_rings  = np.sin(x**2+y**2)*np.pi+np.pi
exp_falloff = 1/np.exp((x**2+y**2)/8)

for i in np.linspace(0, 1, 25):
    time.sleep(0.1)
    pipe.send([x,y,sine_rings*i, exp_falloff])

这种直接使用元素构造函数的方法不允许您使用除默认键和值维之外的任何内容。这个限制的一个简单的解决方法是使用functools.partial,如下面控制长度部分中所示。

由于Pipe是完全通用的,数据可以是任何自定义类型,因此它提供了一种完整的通用机制来传输结构化或非结构化数据。由于这种普遍性,Pipe在使用下一节中描述的Buffer流时不提供一些更复杂的功能和优化。

Buffer

虽然Pipe提供了将任意数据传递给DynamicMap回调的通用解决方案,但另一方面Buffer提供了一种非常强大的方法来处理流表格数据,定义为pandas数据帧,数组,或列的词典(以及StreamingDataFrame,稍后我们将介绍)。 Buffer自动累积表格数据的最后一行N行,其中Nlength定义。

累积数据的能力允许对最近的数据历史执行操作,而绘制后端(例如散景)可以通过仅发送最新的补丁来优化绘图更新。只有当Buffer持有的data对象与绘制的Element数据相同时,此优化才有效,否则所有数据都将正常更新。

一个简单的例子:布朗运动

要初始化Buffer,我们必须提供一个示例数据集,它定义我们将要流式传输的数据的列和dtypes。接下来,我们定义length以保留最后100行数据。如果数据是DataFrame,我们可以指定是否还要使用DataFrame````index。在这种情况下,我们将简单地定义我们想要绘制'x'和'y'位置的DataFrame和'count'作为PointsCurve元素:

In [ ]:
example = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])
dfstream = Buffer(example, length=100, index=False)
curve_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream])
point_dmap = hv.DynamicMap(hv.Points, streams=[dfstream])

应用了一些样式后,我们将显示动态“曲线”和“点”的“叠加”。

In [ ]:
%%opts Points [color_index='count', xaxis=None, yaxis=None] (line_color='black', size=5)
%%opts Curve (line_width=1, color='black')
curve_dmap * point_dmap

现在我们已经设置了Buffer并定义了一个DynamicMap来绘制数据,我们可以开始向它推送数据。我们将定义一个简单的函数,通过累积x,y位置来模拟布朗运动。我们可以通过hv.streams.Buffer直接“发送”`数据。

In [ ]:
def gen_brownian():
    x, y, count = 0, 0, 0
    while True:
        x += np.random.randn()
        y += np.random.randn()
        count += 1
        yield pd.DataFrame([(x, y, count)], columns=['x', 'y', 'count'])

brownian = gen_brownian()
for i in range(200):
    dfstream.send(next(brownian))

最后,我们可以使用clear方法清除流和绘图上的数据:

In [ ]:
#dfstream.clear()

使用Streamz库

现在我们已经发现了什么PipeBuffer可以做它的时间来展示如何将它们与streamz库一起使用。虽然HoloViews不依赖于streamz并且您可以使用流功能而无需了解streamz,但这两个库可以很好地协同工作,允许您构建管道来管理连续的数据流。 Streamz易于用于简单的任务,但也支持复杂的管道,涉及分支,连接,流量控制,反馈等。这里我们主要关注将streamz输出连接到Pipe然后Buffer以获得有关streamz API的更多细节,请参阅[streamz文档](https://streamz.readthedocs.io/en/最新/)。

streamz.StreamPipe一起使用

让我们从一个相当简单的例子开始:

  1. 声明一个streamz.Stream和一个Pipe对象,并将它们连接到一个我们可以推送数据的管道中。
  2. 使用20的sliding_window,它将首先等待20组流更新累积。此时,对于每个后续更新,它将应用pd.concat将最近的20个更新组合到一个新的数据帧中。
  3. 使用streamz.Stream上的sink方法来send得到20个更新为Pipe的集合。
  4. 声明一个DynamicMap,它采用连接的DataFrames的滑动窗口,并使用Scatter元素显示它。
  5. 用“计数”对“散点图”点进行着色并设置范围,然后显示:
In [ ]:
point_source = streamz.Stream()
pipe = Pipe(data=[])
point_source.sliding_window(20).map(pd.concat).sink(pipe.send) # Connect streamz to the Pipe
scatter_dmap = hv.DynamicMap(hv.Scatter, streams=[pipe])

设置完流媒体管道后,我们可以再次显示它:

In [ ]:
%%opts Scatter [color_index='count', bgcolor='black']
scatter_dmap.redim.range(y=(-4, 4))

现在有一个管道,但最初这个图是空的,因为没有数据发送到它。要查看情节更新,让我们使用streamz.Streamemit方法将小块随机大熊猫DataFrames发送到我们的情节:

In [ ]:
for i in range(100):
    df = pd.DataFrame({'x': np.random.rand(100), 'y': np.random.randn(100), 'count': i},
                      columns=['x', 'y', 'count'])
    point_source.emit(df)

使用StreamingDataFrame和StreamingSeries

streamz库(版本0.2)提供了StreamingDataFrameStreamingSeries,它是一种轻松处理表格数据实时源的有效方法。这使得它非常适合使用Buffer。 (在版本0.3streamz已经重命名了类来删除Streaming,所以这里的代码需要更新以用于后来的streamz版本。)使用StreamingDataFrame我们可以轻松传输数据,应用累积和滚动统计等计算,然后使用HoloViews可视化数据。

streamz.dataframe模块提供了一个Random实用程序,它生成一个StreamingDataFrame,它以指定的间隔发出一定频率的随机数据。 example属性让我们可以看到我们可以期待的数据的结构和dtypes:

In [ ]:
simple_sdf = streamz.dataframe.Random(freq='10ms', interval='100ms')
print(simple_sdf.index)
simple_sdf.example.dtypes

由于StreamingDataFrame提供了类似pandas的API,我们可以直接指定对数据的操作。在这个例子中,我们减去一个固定的偏移,然后计算累积和,给我们一个随机漂移的时间序列。然后我们可以将这个数据帧的x值传递给HoloViews的Buffer并提供hv.Curve作为DynamicMap回调,将数据流式传输到HoloViewsCurve(带有默认键和值维度):

In [ ]:
%%opts Curve [width=500 show_grid=True]
sdf1 = (simple_sdf-0.5).cumsum()
hv.DynamicMap(hv.Curve, streams=[Buffer(sdf1.x)])

Random StreamingDataFrame将异步发出事件,向前驱动可视化,直到它被显式停止,我们可以通过调用stop方法来做。

In [ ]:
simple_sdf.stop()

使用StreamingDataFrame API

到目前为止,我们只计算了累积和,但是StreamingDataFrame实际上有一个广泛的API,可以让我们对我们的数据进行广泛的流式计算。例如,让我们将滚动均值应用于我们的x值,窗口为500毫秒,并将其叠加在“原始”数据之上:

In [ ]:
source_df = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf2 = (source_df-0.5).cumsum()
sdf2.tail()
In [ ]:
raw       = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf2.x)]).relabel('raw')
smooth    = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf2.x.rolling('500ms').mean())]).relabel('smooth')

raw * smooth.options(width=900,height=350)
In [ ]:
source_df.stop()