- 使用apscheduler tornad模块,启动scheduler
In [2]:
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.tornado import TornadoScheduler
import pytz
from tradetime import when_tradedate, when_tradetime,is_holiday_today
import moment
from deva import *
jobstores = {
'redis': RedisJobStore(),
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(50),
}
job_defaults = {
'coalesce': False,
'max_instances': 200
}
scheduler = TornadoScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,timezone=pytz.timezone('Asia/Shanghai'))
try:
scheduler.start()
except:
pass
- 在特定交易时间,发驱动任务的消息到跨进程总线
In [3]:
@scheduler.scheduled_job('interval', name='open', seconds=60,start_date='2019-04-03 09:25:00',)
@when_tradedate
def _open():
'open'>>bus
@scheduler.scheduled_job('interval', name='close', seconds=60,start_date='2019-04-03 01:25:30',)
@when_tradedate
def _close():
'close'>>bus
@scheduler.scheduled_job('interval', name='close', days=1,start_date='2019-04-03 11:30:00',)
@when_tradedate
def _morning_close():
'morning_close'>>bus
@scheduler.scheduled_job('interval', name='close', days=1,start_date='2019-04-03 11:30:00',)
@when_tradedate
def _noon_open():
'noon_open'>>bus
In [4]:
scheduler.get_jobs()>>pmap(lambda x:x.next_run_time)>>to_list
Out[4]:
- 定义任务,路由特定总线消息时执行
In [5]:
@bus.route(lambda x:x=='open')
@when_tradetime
def onopen(x):
'open'>>log
@bus.route(lambda x:x=='close')
@when_tradedate
def onclose(x):
'close'>>log