Date Tags stream

-算法和连续数据有关,长度不定持续累积的

In [1]:
from deva import *
from collections import defaultdict
  1. 设计一个状态机,可以接受不同key作为输入,持续产出结果.
  2. 当某个key不到达时候,可能一直不触发某个条件.万事俱备才触发.如果同一时间多个数据产生,需要session机制来识别某个session下的条件是否具备
  3. Use a stateful filter, an Aggregator, to collect and store individual messages until a complete set of related messages has been received. Then, the Aggregator publishes a single message distilled from the individual messages.
  4. The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is 'complete' below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.
In [2]:
# 模式一
# 所有key只使用最新的key,重复来的key不使用,使用后key不做丢弃,会被重复使用

# 模式二
# 指定key需要指定算法做累计,比如单位时间内的平均值

用户行为实时数据构建

用户行为日志如下: {'user_id':'user_id','action':'pay','session_id':'123456','action_parameter':{}}

In [3]:
from deva import *
from collections import defaultdict


s = Stream()
s.sink(write_to_file('user_action.log'))
In [4]:
def user_action(state,x,namespace):
    session_id = maybe(x)['session_id'].or_else('_error_data')
    action = maybe(x)['action'].or_else('_error_data')
    user_id = maybe(x)['user_id'].or_else('_error_data')
    
    s = NS(name=session_id,cache_max_len=50,cache_max_age_seconds=60*60*24)
    if s not in state:
        s.sliding_window(3).sink(lambda x:(x,s.name)>>log)## 将用户行为的流对接到异常行为处理模型?
        state.add(s)        
    
    action>>s

    
    return state
In [5]:
namespace = Namespace()
acc = s.scan(user_action ,start=set(),namespace=namespace)
#只存储了最近的50条数据,用户行为一般时间为1小时即可满足风控需求
#可以只将异地登陆用户的行为流汇入进来,降低系统负载
In [6]:
# 按照此思路,可以将时间做pd.cut后,选几个时间段做key,累计历史好几天到数据各自流做对比.
# 为不占用大量内存,只采有限的几个时间点做即可
In [7]:
acc.state
Out[7]:
set()
In [9]:
for i in range(10):
    for u in range(10):
        {'user_id':'zjw%s'%u,'action':['sell%s'%u,'pay%s'%u,'login%s'%u]>>sample(1)>>first,'session_id':'session_%s'%i}>>s>>passed
[2020-03-08 17:21:59.198970] INFO: deva.bus: (('pay8', 'pay9', 'login0'), 'session_0')
[2020-03-08 17:21:59.202511] INFO: deva.bus: (('pay9', 'login0', 'sell1'), 'session_0')
[2020-03-08 17:21:59.204434] INFO: deva.bus: (('login0', 'sell1', 'login2'), 'session_0')
[2020-03-08 17:21:59.206767] INFO: deva.bus: (('sell1', 'login2', 'pay3'), 'session_0')
[2020-03-08 17:21:59.209250] INFO: deva.bus: (('login2', 'pay3', 'pay4'), 'session_0')
[2020-03-08 17:21:59.211838] INFO: deva.bus: (('pay3', 'pay4', 'sell5'), 'session_0')
[2020-03-08 17:21:59.213466] INFO: deva.bus: (('pay4', 'sell5', 'pay6'), 'session_0')
[2020-03-08 17:21:59.215486] INFO: deva.bus: (('sell5', 'pay6', 'pay7'), 'session_0')
[2020-03-08 17:21:59.221697] INFO: deva.bus: (('pay6', 'pay7', 'login8'), 'session_0')
[2020-03-08 17:21:59.224157] INFO: deva.bus: (('pay7', 'login8', 'sell9'), 'session_0')
[2020-03-08 17:21:59.226872] INFO: deva.bus: (('pay8', 'pay9', 'pay0'), 'session_1')
[2020-03-08 17:21:59.230173] INFO: deva.bus: (('pay9', 'pay0', 'sell1'), 'session_1')
[2020-03-08 17:21:59.235401] INFO: deva.bus: (('pay0', 'sell1', 'sell2'), 'session_1')
[2020-03-08 17:21:59.240826] INFO: deva.bus: (('sell1', 'sell2', 'pay3'), 'session_1')
[2020-03-08 17:21:59.243751] INFO: deva.bus: (('sell2', 'pay3', 'pay4'), 'session_1')
[2020-03-08 17:21:59.257977] INFO: deva.bus: (('pay3', 'pay4', 'login5'), 'session_1')
[2020-03-08 17:21:59.260515] INFO: deva.bus: (('pay4', 'login5', 'sell6'), 'session_1')
[2020-03-08 17:21:59.266029] INFO: deva.bus: (('login5', 'sell6', 'pay7'), 'session_1')
[2020-03-08 17:21:59.271125] INFO: deva.bus: (('sell6', 'pay7', 'pay8'), 'session_1')
[2020-03-08 17:21:59.275500] INFO: deva.bus: (('pay7', 'pay8', 'sell9'), 'session_1')
[2020-03-08 17:21:59.278149] INFO: deva.bus: (('pay8', 'pay9', 'pay0'), 'session_2')
[2020-03-08 17:21:59.282113] INFO: deva.bus: (('pay9', 'pay0', 'pay1'), 'session_2')
[2020-03-08 17:21:59.288966] INFO: deva.bus: (('pay0', 'pay1', 'sell2'), 'session_2')
[2020-03-08 17:21:59.291279] INFO: deva.bus: (('pay1', 'sell2', 'sell3'), 'session_2')
[2020-03-08 17:21:59.294761] INFO: deva.bus: (('sell2', 'sell3', 'login4'), 'session_2')
[2020-03-08 17:21:59.297178] INFO: deva.bus: (('sell3', 'login4', 'sell5'), 'session_2')
[2020-03-08 17:21:59.304529] INFO: deva.bus: (('login4', 'sell5', 'sell6'), 'session_2')
[2020-03-08 17:21:59.310617] INFO: deva.bus: (('sell5', 'sell6', 'pay7'), 'session_2')
[2020-03-08 17:21:59.314302] INFO: deva.bus: (('sell6', 'pay7', 'sell8'), 'session_2')
[2020-03-08 17:21:59.322127] INFO: deva.bus: (('pay7', 'sell8', 'login9'), 'session_2')
[2020-03-08 17:21:59.325896] INFO: deva.bus: (('pay8', 'pay9', 'pay0'), 'session_3')
[2020-03-08 17:21:59.330617] INFO: deva.bus: (('pay9', 'pay0', 'sell1'), 'session_3')
[2020-03-08 17:21:59.334016] INFO: deva.bus: (('pay0', 'sell1', 'sell2'), 'session_3')
[2020-03-08 17:21:59.342965] INFO: deva.bus: (('sell1', 'sell2', 'sell3'), 'session_3')
[2020-03-08 17:21:59.350280] INFO: deva.bus: (('sell2', 'sell3', 'pay4'), 'session_3')
[2020-03-08 17:21:59.355292] INFO: deva.bus: (('sell3', 'pay4', 'pay5'), 'session_3')
[2020-03-08 17:21:59.357845] INFO: deva.bus: (('pay4', 'pay5', 'login6'), 'session_3')
[2020-03-08 17:21:59.363094] INFO: deva.bus: (('pay5', 'login6', 'sell7'), 'session_3')
[2020-03-08 17:21:59.366318] INFO: deva.bus: (('login6', 'sell7', 'login8'), 'session_3')
[2020-03-08 17:21:59.375244] INFO: deva.bus: (('sell7', 'login8', 'pay9'), 'session_3')
[2020-03-08 17:21:59.377917] INFO: deva.bus: (('pay8', 'pay9', 'login0'), 'session_4')
[2020-03-08 17:21:59.382880] INFO: deva.bus: (('pay9', 'login0', 'login1'), 'session_4')
[2020-03-08 17:21:59.389237] INFO: deva.bus: (('login0', 'login1', 'sell2'), 'session_4')
[2020-03-08 17:21:59.391356] INFO: deva.bus: (('login1', 'sell2', 'pay3'), 'session_4')
[2020-03-08 17:21:59.394448] INFO: deva.bus: (('sell2', 'pay3', 'login4'), 'session_4')
[2020-03-08 17:21:59.400868] INFO: deva.bus: (('pay3', 'login4', 'sell5'), 'session_4')
[2020-03-08 17:21:59.404660] INFO: deva.bus: (('login4', 'sell5', 'sell6'), 'session_4')
[2020-03-08 17:21:59.406763] INFO: deva.bus: (('sell5', 'sell6', 'pay7'), 'session_4')
[2020-03-08 17:21:59.409682] INFO: deva.bus: (('sell6', 'pay7', 'pay8'), 'session_4')
[2020-03-08 17:21:59.413567] INFO: deva.bus: (('pay7', 'pay8', 'pay9'), 'session_4')
[2020-03-08 17:21:59.416807] INFO: deva.bus: (('pay8', 'pay9', 'sell0'), 'session_5')
[2020-03-08 17:21:59.421294] INFO: deva.bus: (('pay9', 'sell0', 'login1'), 'session_5')
[2020-03-08 17:21:59.425946] INFO: deva.bus: (('sell0', 'login1', 'login2'), 'session_5')
[2020-03-08 17:21:59.428229] INFO: deva.bus: (('login1', 'login2', 'login3'), 'session_5')
[2020-03-08 17:21:59.431375] INFO: deva.bus: (('login2', 'login3', 'sell4'), 'session_5')
[2020-03-08 17:21:59.434419] INFO: deva.bus: (('login3', 'sell4', 'sell5'), 'session_5')
[2020-03-08 17:21:59.437586] INFO: deva.bus: (('sell4', 'sell5', 'pay6'), 'session_5')
[2020-03-08 17:21:59.440521] INFO: deva.bus: (('sell5', 'pay6', 'pay7'), 'session_5')
[2020-03-08 17:21:59.442751] INFO: deva.bus: (('pay6', 'pay7', 'pay8'), 'session_5')
[2020-03-08 17:21:59.444507] INFO: deva.bus: (('pay7', 'pay8', 'login9'), 'session_5')
[2020-03-08 17:21:59.447849] INFO: deva.bus: (('pay8', 'pay9', 'pay0'), 'session_6')
[2020-03-08 17:21:59.450936] INFO: deva.bus: (('pay9', 'pay0', 'pay1'), 'session_6')
[2020-03-08 17:21:59.453758] INFO: deva.bus: (('pay0', 'pay1', 'sell2'), 'session_6')
[2020-03-08 17:21:59.455616] INFO: deva.bus: (('pay1', 'sell2', 'sell3'), 'session_6')
[2020-03-08 17:21:59.458066] INFO: deva.bus: (('sell2', 'sell3', 'sell4'), 'session_6')
[2020-03-08 17:21:59.464533] INFO: deva.bus: (('sell3', 'sell4', 'sell5'), 'session_6')
[2020-03-08 17:21:59.467636] INFO: deva.bus: (('sell4', 'sell5', 'pay6'), 'session_6')
[2020-03-08 17:21:59.471652] INFO: deva.bus: (('sell5', 'pay6', 'login7'), 'session_6')
[2020-03-08 17:21:59.473706] INFO: deva.bus: (('pay6', 'login7', 'login8'), 'session_6')
[2020-03-08 17:21:59.478975] INFO: deva.bus: (('login7', 'login8', 'pay9'), 'session_6')
[2020-03-08 17:21:59.481553] INFO: deva.bus: (('pay8', 'pay9', 'login0'), 'session_7')
[2020-03-08 17:21:59.483800] INFO: deva.bus: (('pay9', 'login0', 'login1'), 'session_7')
[2020-03-08 17:21:59.485890] INFO: deva.bus: (('login0', 'login1', 'login2'), 'session_7')
[2020-03-08 17:21:59.487970] INFO: deva.bus: (('login1', 'login2', 'sell3'), 'session_7')
[2020-03-08 17:21:59.490226] INFO: deva.bus: (('login2', 'sell3', 'sell4'), 'session_7')
[2020-03-08 17:21:59.493352] INFO: deva.bus: (('sell3', 'sell4', 'pay5'), 'session_7')
[2020-03-08 17:21:59.496348] INFO: deva.bus: (('sell4', 'pay5', 'sell6'), 'session_7')
[2020-03-08 17:21:59.499297] INFO: deva.bus: (('pay5', 'sell6', 'pay7'), 'session_7')
[2020-03-08 17:21:59.501455] INFO: deva.bus: (('sell6', 'pay7', 'pay8'), 'session_7')
[2020-03-08 17:21:59.504159] INFO: deva.bus: (('pay7', 'pay8', 'sell9'), 'session_7')
[2020-03-08 17:21:59.506723] INFO: deva.bus: (('pay8', 'pay9', 'pay0'), 'session_8')
[2020-03-08 17:21:59.514695] INFO: deva.bus: (('pay9', 'pay0', 'pay1'), 'session_8')
[2020-03-08 17:21:59.517103] INFO: deva.bus: (('pay0', 'pay1', 'login2'), 'session_8')
[2020-03-08 17:21:59.520026] INFO: deva.bus: (('pay1', 'login2', 'pay3'), 'session_8')
[2020-03-08 17:21:59.521916] INFO: deva.bus: (('login2', 'pay3', 'pay4'), 'session_8')
[2020-03-08 17:21:59.523780] INFO: deva.bus: (('pay3', 'pay4', 'sell5'), 'session_8')
[2020-03-08 17:21:59.525585] INFO: deva.bus: (('pay4', 'sell5', 'pay6'), 'session_8')
[2020-03-08 17:21:59.528993] INFO: deva.bus: (('sell5', 'pay6', 'pay7'), 'session_8')
[2020-03-08 17:21:59.531516] INFO: deva.bus: (('pay6', 'pay7', 'sell8'), 'session_8')
[2020-03-08 17:21:59.533768] INFO: deva.bus: (('pay7', 'sell8', 'pay9'), 'session_8')
[2020-03-08 17:21:59.535803] INFO: deva.bus: (('pay8', 'pay9', 'pay0'), 'session_9')
[2020-03-08 17:21:59.538472] INFO: deva.bus: (('pay9', 'pay0', 'pay1'), 'session_9')
[2020-03-08 17:21:59.541107] INFO: deva.bus: (('pay0', 'pay1', 'sell2'), 'session_9')
[2020-03-08 17:21:59.543722] INFO: deva.bus: (('pay1', 'sell2', 'sell3'), 'session_9')
[2020-03-08 17:21:59.547214] INFO: deva.bus: (('sell2', 'sell3', 'sell4'), 'session_9')
[2020-03-08 17:21:59.549736] INFO: deva.bus: (('sell3', 'sell4', 'pay5'), 'session_9')
[2020-03-08 17:21:59.552943] INFO: deva.bus: (('sell4', 'pay5', 'pay6'), 'session_9')
[2020-03-08 17:21:59.557522] INFO: deva.bus: (('pay5', 'pay6', 'sell7'), 'session_9')
[2020-03-08 17:21:59.573601] INFO: deva.bus: (('pay6', 'sell7', 'sell8'), 'session_9')
[2020-03-08 17:21:59.577598] INFO: deva.bus: (('sell7', 'sell8', 'login9'), 'session_9')
In [10]:
acc.state
Out[10]:
{<session_0; Stream>,
 <session_1; Stream>,
 <session_2; Stream>,
 <session_3; Stream>,
 <session_4; Stream>,
 <session_5; Stream>,
 <session_6; Stream>,
 <session_7; Stream>,
 <session_8; Stream>,
 <session_9; Stream>}