xxl-job核心流程

news/2024/5/19 1:06:39 标签: xxl-job

xxl-job服务端
1、registryMonitorThread守护线程
     查询xxl_job_group表所有address_type=0执行器为自动注册的所有执行器,查询xxl_job_registry表所有update_time小于当前时间90秒
的应该死亡的注册地址信息,根据id删除所有xxl_job_registry表死亡的注册信息,查询xxl_job_registry表所有update_time大于当前时间90秒
的活着的注册信息,根据registry_group=EXECUTOR的信息根据registry_key即执行器编码形成分组,
找到注册到相同app_name的所有ip地址信息,遍历第一步查询出来的自动注册的所有执行器,将app_name相同的registry_key对应的ip地址以逗号分隔
更新到address_list字段,沉睡30秒后继续执行该守护线程的方法
2、monitorThread守护线程
        查询xxl_job_log表最旧的alarm_status=0默认告警且(handle_code = 200处理成功或者trigger_code in (0, 200) and handle_code = 0处理结束处理失败)
的1000条日志数据,循环所有的数据,将其状态alarm_status=0从0改成-1即无需警告,查出来的日志如果重试次数大于0的话进行重试操作并更新重试之后的xxl_job_log表数据,并对这些重试的日志进行邮件告警,配置多个邮件时,全部成功为告警成功,否则为告警失败,将xxl_job_log表数据改成alarm_status=2或3,即告警成功或告警失败
,沉睡10秒后继续执行该守护线程的方法

3、monitorThread守护线程
   先沉睡50秒,找xxl_job_log表调度成功handleCode=200且处理成功trigger_code = 200且调度时间trigger_time超过10分钟且xxl_job_registry表注册信息已经丢失的日志数据,循环查询出来的所有数据,将处理结果handleCode改成调度失败500,沉睡60秒后继续执行该守护线程的方法
4、logrThread守护线程
   查询xxl_job_log表最近3天处理中,处理成功、处理失败的数量,并将统计之后的结果更新到xxl_job_log_report表,当xxl.job.logretentiondays(默认30天)配置的日志清理天数大于7天且距离上次清理日志时间大于24小时,循环分页查询xxl_job_log表trigger_time小于配置的日志清理时间的最近1000条数据,
这1000条数据根据id进行清理,将上次清理日志时间更新为当前时间,沉睡1分钟后继续执行该守护线程的方法
5、scheduleThread守护线程
   随机休眠4秒到5秒的时间,分页一次读取6000条(xxl.job.triggerpool.fast.max+xxl.job.triggerpool.slow.max)*20  xxl_job_info表运行中且下次触发时间trigger_next_time不超过当前5秒后的数据,循环所有的数据,对于下次触发时间<当前时间-5秒的数据根据
过期策略决定是否要重试,将下次触发时间刷到最新的时间,对于当前时间-5秒<下次触发时间<当前时间的数据,及与现在相比5秒内过期的数据,直接执行一次触发调度,将下次触发时间刷到最新,如果当前时间<下次触发时间<当前时间+5的数据,即5秒内即将触发的
数据,int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60),根据下次触发时间将数据分割成以秒为单位的时间轮,Map<Integer, List<Integer>> ringData即存放不同的秒数的时间轮数据,value为当前所有任务id的列表,
然后将下次触发时间更新成最新的数据,然后循环更新后的结果list,数据库里面将数据更新到最新,提交更新,如果总的花费时间小于1秒的话休眠0秒到5秒
6、ringThread守护线程
   随机休眠0秒到1秒,获取到当前时间及当前时间向前一秒的时间轮数据,删除ringData数据并获取到所有的jobid,循环遍历所有的jobid列表,触发调度,然后继续循环执行该守护线程的方法

xxl-job客户端
1、XxlJobSpringExecutor注入到IOC容器后根据SmartInitializingSingleton和DisposableBean接口完成一系列的初始化操作和关闭时的销毁操作,根据applicationContext.getBeanNamesForType找到所有注到IOC容器的bean
,循环遍历所有的beanName数组,跳过@Lazy懒加载的bean,根据Object bean = applicationContext.getBean(beanDefinitionName)获取到当前Bean,根据Map<Method, XxlJob> annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        (MethodIntrospector.MetadataLookup<XxlJob>) method -> AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class));方法获取到所有添加XxlJob注解的方法,循环遍历所有的annotatedMethods,注册job的信息,
然后将注册的方法,初始化的方法以及销毁的方法放入ConcurrentMap<String, IJobHandler> jobHandlerRepository中
2、创建SpringGlueFactory工厂
3、执行启动方法
3.1 根据xxl.job.executor.logpath配置初始化日志地址,没有配置日志地址则有一个默认的日志地址/data/applogs/xxl-job/jobhandler,给logBasePath和glueSrcPath赋值
3.1 根据配置的注册中心地址xxl.job.admin.addresses和accessToken初始化List<AdminBiz> adminBizList客户端,
4、localThread守护线程
    找到xxl.job.executor.logpath日志信息的所有文件,遍历该文件夹下的所有文件,文件夹跳过,如果文件的创建时间<xxl.job.executor.logretentiondays配置的日志文件过期时间,则删除该文件,沉睡1天后继续执行该守护线程的方法
5、triggerCallbackThread守护线程
    从阻塞队列里面callBackQueue拿到回调参数HandleCallbackParam,将阻塞队列中的所有元素转移到新的callbackParamList列表,然后callbackParamList添加从阻塞队列拿出来的HandleCallbackParam,调用注册中心的回调接口api/callback
通知注册中心调用结果,回调成功后在本地保存xxl_job_log表回调成功的日志信息,失败保存失败信息,按照时间生成当前回调参数的日志文件,当任务停止后最后一次把阻塞队列里面的参数全都拿出来进行回调处理
6、triggerRetryCallbackThread守护线程
    找到所有的失败回调日志文件,反序列化其参数,删除改文件,继续尝试回调操作,沉睡30秒后继续执行该守护线程的方法
7、初始化executor-server服务器
   启动ServerBootstrap bootstrap netty服务器,端口默认为9999接收注册中心的回调信息,开始注册客户端信息到注册中心,
8、registryThread守护线程
   根据配置的appname和本地的netty服务器address地址构建xxl_job_registry表对象RegistryParam,循环所有的adminBizList客户端,执行注册方法registry,注册中心的register方法把xxl_job_registry的更新时间更新到当前时间,更新失败则直接保存新的数据
   沉睡30秒后继续执行该守护线程的方法,当线程停止后调用移除注册信息的方法,注册中心会删除注册的数据信息
   


http://www.niftyadmin.cn/n/247244.html

相关文章

这里有小白最关心的亚马逊防关联问题

账号的安全问题&#xff0c;很多时候和关联问题相关&#xff0c;一旦被检测到关联就会导致账号被永久封号。亚马逊更是官方出过规定&#xff0c;一个卖家只能开一个账号&#xff0c;如果同一个ip登录两个以上的账户&#xff0c;就很容易导致关联。这样讲可能会有点模糊&#xf…

RabbitMQ之介绍以及安装

1.1 MQ的相关概念 1.1.1 什么是MQ ​ MQ&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中存放的内容是message而已&#xff0c;还是一种跨进程的通信机制&#xff0c;用于上下游传递消息。在互联网架构中&#xff0c;MQ…

微信小程序监听添加页添加了数据在列表页根据传递的状态刷新列表

需求&#xff1a;我在添加页添加了数据&#xff0c;需要返回列表页时&#xff0c;根据返回的参数刷新列表 列表页面&#xff1a; onShow(options) {var thatthis;let pages getCurrentPages();let currPage pages[pages.length - 1];console.log("添加页面传递过来的数据…

深度学习中的一阶段目标检测

博主简介 博主是一名大二学生&#xff0c;主攻人工智能研究。感谢让我们在CSDN相遇&#xff0c;博主致力于在这里分享关于人工智能&#xff0c;c&#xff0c;Python&#xff0c;爬虫等方面知识的分享。 如果有需要的小伙伴可以关注博主&#xff0c;博主会继续更新的&#xff0c…

中介者设计模式(Mediator Design Pattern)[论点:概念、组成角色、相关图示、示例代码、适用场景]

文章目录 概念组成角色相关图示示例代码适用场景 概念 中介者设计模式是一种行为型设计模式&#xff0c;它通过引入一个中介对象来封装一组对象之间的交互&#xff0c;使得对象之间不需要显式地相互引用&#xff0c;从而降低它们之间的耦合。通过将对象间的通信封装到中介者对象…

【进阶C语言】有关动态内存管理的经典笔试题(详细图文讲解)

前言 &#x1f4d5;作者简介&#xff1a;热爱跑步的恒川&#xff0c;致力于C/C、Java、Python等多编程语言&#xff0c;热爱跑步&#xff0c;喜爱音乐的一位博主。 &#x1f4d7;本文收录于C语言进阶系列&#xff0c;本专栏主要内容为数据的存储、指针的进阶、字符串和内存函数…

说说微信小程序中路由跳转的方式有哪些?区别?

① wx.navigateTo() 保留当前页面&#xff0c;跳转到应用内的某个页面。使用 wx.navigateBack 可以返回到原页面。 ② wx.redirectTo() 关闭当前页面&#xff0c;跳转到应用内的某个页面。 ③ wx.switchTab() 跳转到 tabBar 页面&#xff0c;同时关闭其他非 tabBar 页面。 ④ w…

Linux环境对Nginx开源版源码下载、编译、安装、开机自启

一、准备内容Centos8安装yum源https://blog.csdn.net/xiaochenXIHUA/article/details/127251704?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168197299116800211534092%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%