In [1]:
import streamz

from requests_html import HTMLSession
session = HTMLSession()

def get_response(url):
    global session
    return session.get(url)

def get_result(response):
    return response.html.search('<title>{}</title>'),response.url

def get_links(response):
    return response.html.absolute_links

def is_special_url(url):
    return 'gndy' in url

def is_special_response(response):
    return 'gndy' in response.url

source = streamz.Stream()
pages = source.unique()
response = pages.rate_limit(1).map(get_response)
special_response = response.filter(is_special_response)
result = special_response.map(get_result)
links = response.map(get_links).flatten()# list to stream
links.sink(source.emit)  # pipe new links back into pages

result.sink(print)
In [2]:
source.visualize()
Out[2]:
In [3]:
source.emit('http://www.dytt8.net')
(<Result ('2017年喜剧《疯狂成名记》BD中英双字幕迅雷下载_电影天堂',) {}>, 'https://www.dytt8.net/html/gndy/jddy/20181026/57678.html')
(<Result ('2008爱情《李米的猜想》HD国语中文字幕迅雷下载_电影天堂',) {}>, 'https://www.dytt8.net/html/gndy/jddy/20170418/53762.html')
(<Result ('2018年奇幻剧情《画皮师2》HD国语中字迅雷下载_电影天堂',) {}>, 'https://www.dytt8.net/html/gndy/jddy/20181021/57657.html')
(<Result ('2018年喜剧《公主大对换》BD中英双字幕迅雷下载_电影天堂',) {}>, 'https://www.dytt8.net/html/gndy/dyzz/20181117/57804.html')
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-3-16a897b490c4> in <module>()
----> 1 source.emit('http://www.dytt8.net')

/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/streamz/core.py in emit(self, x, asynchronous)
    330 
    331                 raise gen.Return(result)
--> 332             sync(self.loop, _)
    333 
    334     def update(self, x, who=None):

/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/streamz/core.py in sync(loop, func, *args, **kwargs)
   1273     else:
   1274         while not e.is_set():
-> 1275             e.wait(10)
   1276     if error[0]:
   1277         six.reraise(*error[0])

/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py in wait(self, timeout)
    550             signaled = self._flag
    551             if not signaled:
--> 552                 signaled = self._cond.wait(timeout)
    553             return signaled
    554 

/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py in wait(self, timeout)
    298             else:
    299                 if timeout > 0:
--> 300                     gotit = waiter.acquire(True, timeout)
    301                 else:
    302                     gotit = waiter.acquire(False)

KeyboardInterrupt: 
(<Result ('2018年剧情《三角草的春天》BD日语中字迅雷下载_电影天堂',) {}>, 'https://www.dytt8.net/html/gndy/dyzz/20181031/57698.html')
(<Result ('2018年动作《蜀山降魔传》BD国粤双语中字迅雷下载_电影天堂',) {}>, 'https://www.dytt8.net/html/gndy/dyzz/20181015/57623.html')
(<Result ('2018年科幻动作《铁血战士》HD韩版中字迅雷下载_电影天堂',) {}>, 'https://www.dytt8.net/html/gndy/dyzz/20181114/57791.html')