Django定时任务框架django-apscheduler的使用
1.安装库
pip install django-apscheduler
2.添加 install_app
django_apscheduler
3.在app下添加一个task.py文件,用来实现具体的定时任务
task.py
def my_scheduled_job():
print("这个任务每3秒执行一次", time.time())
4.在app下创建一个management文件夹,里面包含一个空的__init__.py和一个文件夹commands,commands里面包含一个空的__init__.py和一个start_tasks.py,其中start_tasks.py是主要的启动脚本,结构如下:
from datetime import datetime
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from ...tasks import my_scheduled_job, my_scheduled_job1
#
# Django manage.py命令:存储定时任务信息
#
class Command(BaseCommand):
help = '启动定时任务.'
def handle(self, *args, **options):
# 调度器
scheduler = BlockingScheduler() # 研发阶段使用
# scheduler = BackgroundScheduler() # 生产阶段使用
# 任务存储
scheduler.add_jobstore(DjangoJobStore(), 'default')
# 配置线程池执行器,限制最大并发数为1,防止并发
executor = ThreadPoolExecutor(max_workers=1)
scheduler.executor = executor
# 注册定义任务
id_print_task = 'print_task__job'
print('开始-增加任务({})'.format(id_print_task))
scheduler.add_job(
my_scheduled_job,
id=id_print_task,
name=id_print_task,
max_instances=1,
replace_existing=True,
trigger=IntervalTrigger(seconds=15, start_date=datetime.now(), ), # 从当前时间开始,每15秒钟调度一次
)
# 指定某个时间运行
run_date = datetime(2024, 7, 18, 18, 33)
scheduler.add_job(
my_scheduled_job1,
id='id_print_task1',
name='id_print_task1',
trigger=DateTrigger(run_date=run_date), # 使用 DateTrigger 并设置 run_date
replace_existing=True # 如果已存在同名任务,则替换它
)
# 指定每天某个时间运行
scheduler.add_job(
my_scheduled_job1,
id='daily_task_at_18_36',
name='Daily Task at 18:36',
trigger=CronTrigger(hour=18, minute=38), # 使用 CronTrigger 并设置小时和分钟
replace_existing=True # 如果已存在同名任务,则替换它
)
print('完成-增加任务({})'.format(id_print_task))
# 启动定时任务
try:
scheduler.start()
except KeyboardInterrupt:
scheduler.shutdown()
5.修改settings.py配置,将时区改为中国,最主要是把USE两项注释掉,否则后面的定时任务的时间会晚八小时!!!!
settings.py LANGUAGE_CODE = 'zh-hans' # 中文语言代码 TIME_ZONE = 'Asia/Shanghai' # 中国上海时区 # USE_I18N = True # # USE_TZ = True
6.添加完成后,做数据库迁移,会生成 表django_apscheduler_djangojob 和 表django_apscheduler_djangojobexecution
python manage.py migrate
7.启动项目,然后启动定时任务:
python manage.py start_tasks
启动后,任务开始执行,数据库上述两张表会有数据进去,定时任务完成,其他的见后面拓展。
拓展:
django-apscheduler框架还提供了很多操作定时任务的函数。比如:
- 删除任务
scheduler.remove_job(job_name) - 暂停任务
scheduler.pause_job(job_name) - 开启任务
scheduler.resume_job(job_name) - 获取所有任务
scheduler.get_jobs() - 修改任务
scheduler.modify_job(job_name)注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库,
第三用到下面重设任务。
- 重设任务
scheduler.reschedule_job(job_name)scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)
- 重设任务
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!


