一个完整示例

这里给出一个简单的完整示例,仅供参考。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstore.memory import MemoryJobStore
from apscheduler.executor.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED


jobstores = {
	'default': MemoryJobStore()
}
executors = {
	'default': ThreadPoolExecutor(20)
}
job_defaults = {
	'coalesce': False,
	'max_instance': 1
}
scheduler = BlockingScheduler(
    jobstores=jobstores, 
    executors=executors, 
    job_defaults=job_defaults
)
scheduler.add_job(my_job, 'cron', hour='*/3', id='my_job')
scheduler.add_job(short_one, 'interval', seconds=5, id='short_job')

def on_job_event(event):
	if event.exception:
		print(f'Job {event.job_id} crashed')
	else:
		print(f'Job {event.job_id} succeded.')

scheduler.add_listener(on_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()