一个完整示例
这里给出一个简单的完整示例,仅供参考。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstore.memory import MemoryJobStore
from apscheduler.executor.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20)
}
job_defaults = {
'coalesce': False,
'max_instance': 1
}
scheduler = BlockingScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults
)
scheduler.add_job(my_job, 'cron', hour='*/3', id='my_job')
scheduler.add_job(short_one, 'interval', seconds=5, id='short_job')
def on_job_event(event):
if event.exception:
print(f'Job {event.job_id} crashed')
else:
print(f'Job {event.job_id} succeded.')
scheduler.add_listener(on_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()