Django+vue自动化测试平台(25)-- 自动化测试之封装APscheduler定时任务框架
APscheduler简介
APscheduler全称Advanced Python Scheduler,作用为在指定的时间规则执行指定的作业,其是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。
(图片来源网络,侵删)
组成部分说明
# 触发器(trigger): 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。 除了他们自己初始配置意外,触发器完全是无状态的。 # 作业存储(job store): 存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。 一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。 调度器不能分享同一个作业存储。 # 执行器(executor): 处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。 当作业完成时,执行器将会通知调度器。 # 调度器(scheduler): 通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。 配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
框架对比:Celery VS APScheduler:
celery:
celery是一个专注于实时处理和任务调度的任务队列,任务就是消息(消息队列使用rabbitmq或者redie),消息中的有效载荷中包含要执行任务的全部数据。我们通常将celery作为一个任务队列来使用,但是celery也有定时任务的功能,但是celery无法在flask这样的系统中动态的添加定时任务,而且单独为定时任务功能而搭建celery显得过于重量级。
apscheduler:
apscheduler是基于Quartz的一个Python定时任务框架,提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化作业。APScheduler算是在实际项目中最好用的一个工具库,不仅可以在程序中动态的添加和删除定时任务,还支持持久化。
函数封装:
# 自定义调用函数 def scheduler_function(request): try: scheduler_id= "示例:123" scheduler_type = "示例:1" task_time = "示例:2024-01-01 00:00:00" task_hour= "示例:1" task_week_days= "示例:mon" task_week_times= "示例:22:00" data= "函数传参" # 示例:创建定时任务,可自主定义传参 add_scheduler_task(scheduler_type, scheduler_id, task_time, task_hour, task_week_days, task_week_times, data) except Exception as e: return False, f"创建定时任务失败,原因是:{str(e)}" # 添加定时任务 def add_scheduler_task(scheduler_type, scheduler_id, task_time, task_hour, task_week_days, task_week_times, data): try: scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoJobStore(), 'default') if scheduler_type == 1: # 某个时刻执行 scheduler.add_job(run_task, 'date', id=str(scheduler_id), run_date=task_time, args=(data,)) scheduler.start() elif scheduler_type == 2: # 间隔时间性执行 scheduler.add_job(run_task, 'interval', id=str(scheduler_id), minutes=task_hour, args=(data,)) scheduler.start() elif scheduler_type == 3: # 每周几点执行 start_time = task_week_times.split(':') h = int(start_time[0]) m = int(start_time[1]) scheduler.add_job(run_task, 'cron', id=str(scheduler_id), day_of_week=task_week_days, hour=h, minute=m, args=(data,)) scheduler.start() return True, "创建定时任务成功" except Exception as e: return False, f"创建定时任务失败,原因是:{str(e)}" # 删除定时任务 def remove_scheduler(scheduler_id): try: scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoJobStore(), 'default') DjangoJob.objects.filter(id=str(scheduler_id)).delete() return True, "删除定时任务成功" except Exception as e: return False, f"删除定时任务失败,原因是:{str(e)}" # 获取定时任务,判断是否存在,若有,则返回下一次执行的时间 def get_scheduler(scheduler_id): try: job = DjangoJob.objects.filter(id=str(scheduler_id)).values().first() if job: return True, job["next_run_time"] else: return False, "" except Exception as e: return False, f"查询定时任务列表失败,原因:{str(e)}"
总结:
以上封装的函数可以解决绝大部分所需要的定时任务类型,分别是单次执行,间隔执行,周期性执行,满足绝大多数的自动化测试使用,注:个人愚见,仅供参考!!!
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。