分布式任务调度框架XXL-JOB详解

news/2024/5/18 23:21:02 标签: 分布式, wpf, XXL-JOB, java

分布式任务调度

概述

场景: 如12306网站根据不同车次设置放票时间点,商品成功发货后向客户发送短信提醒等任务,某财务系统需要在每天上午10天前统计前一天的账单数据

任务的调度是指系统为了完成特定业务,基于给定的时间点,时间间隔,执行次数等条件自动执行某个任务

  • 多线程: 充分利用单机的资源
  • 分布式加多线程: 使用多台计算机且每台计算机使用多线程处理,可扩展性更强

分布式任务调度是指将任务调度程序分布式构建而不再是将任务调度的程序都集成在某个服务中,这样可以大大提高任务的调度处理能力

  • 并行任务调度:由于一台计算机CPU的处理能力是有限的,将任务调度程序分布式部署可以让多台计算机共同去完成任务调度,
  • 高可用:若某一个实例宕机不影响其他实例来执行任务
  • 弹性扩容:将任务分割为若干个分片并由不同的实例并行执行,根据情况向集群中增加实例来提高任务调度的处理效率
  • 任务管理与监测:对系统中存在的所有定时任务进行统一的管理及监测,让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应
  • 避免任务重复执行(相同任务在多个运行实例上只执行一次):当任务调度以集群方式部署时,同一个任务调度程序可能会执行多次,如重复发放了多次优惠券

在这里插入图片描述

传统实现方式

多线程方式实现: 开启一个线程,每sleep一段时间就去检查是否已到预期执行时间

java">public static void main(String[] args) {    
    // 任务执行的间隔时间
    final long timeInterval = 1000;
    Runnable runnable = new Runnable() {
        public void run() {
            while (true) {
                //TODO:something
                try {
                    Thread.sleep(timeInterval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };
    Thread thread = new Thread(runnable);
    thread.start();
}

Timer方式实现: 每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行

java">public static void main(String[] args){  
    Timer timer = new Timer();  
    timer.schedule(new TimerTask(){
        @Override  
        public void run() {  
           //TODO:something
        }  
    }, 1000, 2000);  //1秒后开始调度,每2秒执行一次
}

ScheduledExecutor方式实现: 每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的相互之间不会受到干扰

java">public static void main(String [] agrs){
    ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
    service.scheduleAtFixedRate(
            new Runnable() {
                @Override
                public void run() {
                    //TODO:something
                    System.out.println("todo something");
                }
            }, 1,2, TimeUnit.SECONDS);
}

调度框架实现

传统方式缺点: 仅能完成基于开始时间与重复间隔的任务调度,对于复杂的调度需求无法完成

  • 复杂需求: 如设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据

Quartz任务调度框架: 既支持简单的按时间间隔调度,也能通过CronTrigger表达式(秒/分/时/日/月/周/)设置按日历进行任务调度

  • JobDetail:负责定义需要执行的任务逻辑
  • Trigger:负责设置调度策略
  • Scheduler: 将二者组装在一起,并触发任务开始执行
java">public static void main(String [] agrs) throws SchedulerException {
    // 创建一个Scheduler
    SchedulerFactory schedulerFactory = new StdSchedulerFactory();
    Scheduler scheduler = schedulerFactory.getScheduler();
    // 创建JobDetail
    JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
    jobDetailBuilder.withIdentity("jobName","jobGroupName");
    JobDetail jobDetail = jobDetailBuilder.build();
    // 创建触发的CronTrigger支持按日历调度
    CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("triggerName", "triggerGroupName")
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
                .build();
    scheduler.scheduleJob(jobDetail,trigger);
    scheduler.start();
}
// 定义需要执行的任务逻辑
public class MyJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext){
        System.out.println("todo something");
    }
}

XXL-JOB框架

概述

XXL-JOB是一个轻量级分布式任务调度平台,学习简单、轻量级、易扩展,主要有调度中心、执行器、任务三部分组成

  • 调度中心:负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码

  • 任务执行器:负责接收调度中心发出的调度请求即要执行哪个任务,接收到任务后会放入线程池中的任务队列,任务执行完后将执行结果上报

  • 任务:执行处理具体业务的逻辑代码

在这里插入图片描述

docker部署

使用容器进行集群部署xxl-job-admin任务调度中心,通过nginx对其做负载均衡访问,对http://调度中心服务IP地址:端口/xxl-job-admin/进行代理

# 拉取镜像
docker pull xuxueli/xxl-job-admin
# 启动容器
docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin  -d xuxueli/xxl-job-admin:{版本号}

# 自定义数据源
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai" -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin  -d xuxueli/xxl-job-admin:{指定版本}

# 编写脚本启动xxl-job调度中心
sh /data/soft/restart.sh

搭建XXL-JOB工程

第一步从GitHub或Gitee下载XXL-JOB工程,我这里使用的是2.3.1版本

在这里插入图片描述

第二步:执行官方提供的doc目录下的tables_xxl_job.sql数据库脚本创建xxl_job_admin工程需要的数据库表
在这里插入图片描述

第三步:在xxl-job-admin工程的配置文件application.properties中设置端口号和数据库,然后启动服务并使用浏览器访问,账号和密码为admin/123456

在这里插入图片描述

第四步:启动成功之后也可以使用maven命令将xxl-job-admin工程打包然后将其上传至Linux中运行方便访问

# 启动项目
nohup java -jar /绝对路径/xxl-job-admin-2.3.1.jar &

在这里插入图片描述

添加并配置执行器

第一步:进入xxl-job调度中心中提前声明一个执行器,注意要想真正创建一个执行器还是要在Java程序中

第二步:在媒资理模块的media-service工程中添加xxl-job-core依赖,这里面将调度中心和执行器的工作流程都提前写好了

<dependency>
    <groupId>com.xuxueli</groupId>
    <!--项目的父工程已约定了版本2.3.1-->
    <artifactId>xxl-job-core</artifactId>
</dependency>

第三步:在Nacos中的dev环境下创建media-service-dev.yaml配置文件指定执行器将要注册的调度中心地址和启动端口号

  • 配置执行器的应用名时需要与调度中心添加的执行器名称相同,双方会根据名称建立绑定关系
xxl:
  job:
    admin: 
      addresses: http://192.168.101.65:8088/xxl-job-admin
    executor:
      appname: media-process-service # 执行器的应用名与调度中心中添加的名称相同
      address: 
      ip: 
      port: 9999 # 执行器启动的端口,如果本地启动多个执行器注意端口不能重复
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
    accessToken: default_token

第三步:将xxl-job-executor-sample-springboot工程下的XxlJobConfig拷贝到媒资管理模块的mdeia-service工程的config包下

  • XxlJobConfig配置类中提供了创建的执行器对应的Bean,我们需要将其注册到容器中
<!--引入依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-commons</artifactId>
    <version>${version}</version>
</dependency>
java">@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.address}")
    private String address;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    // 执行器对应的Bean
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

第四步:测试执行器与调度中心是否正常通信,通过启动媒资管理模块的接口工程(依赖service工程)启动执行器,然后查看调度中心的执行器管理界面

在这里插入图片描述

添加并配置任务

常见任务相关的基础,调度,任务配置
在这里插入图片描述

配置选项
调度类型固定速度: 按固定的间隔定时调度
Cron: 通过Cron表达式(秒/分/时/日/月/周)实现更丰富的定时调度策略,xxl-job也提供了图形界面去配置
30 10 1 * * ? 指定每天1点10分30秒触发,0/30 * * * * ? 每30秒触发一次
运行模式BEAN: 在项目工程中编写执行器的任务代码
GLUE: 在调度中心的任务参数中编写任务代码
JobHandler(任务方法名)在项目工程中使用@XxlJob注解指定的任务方法名称
路由策略(即当执行器集群部署时,调度中心向哪个执行器下发任务)第一个: 只向第一个执行器下发任务

第一步:参考xxl-job-executor-sample-springboot工程下的任务类,在媒资管理模块中media-service工程的service包下新建jobhandler存放任务类

注解作用
@XxlJobvalue=“自定义jobhandler名称,与在调度中心新建任务时指定的JobHandler属性值一致”
init = “JobHandler初始化方法”
destroy = “JobHandler销毁方法”)
XxlJobHelper.log打印的执行日志
java">@Component
@Slf4j
public class SampleJob {
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
    
    // 默认任务结果为成功状态,不需要主动设置,也可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果
 
    // 简单任务
    @XxlJob("testJob")
    public void testJob() throws Exception {
        // 任务执行逻辑
        log.info("开始执行.....");
    }
    // 分片广播任务
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() throws Exception {
        
    }

    // 命令行任务
    @XxlJob("commandJobHandler")
    public void commandJobHandler() throws Exception {
        
    }

    // 跨平台Http任务
    @XxlJob("httpJobHandler")
    public void httpJobHandler() throws Exception {

    }

    // 生命周期任务,任务初始化与销毁时,支持自定义相关逻辑
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public void demoJobHandler2() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");
    }
    public void init(){
        logger.info("init");
    }
    public void destroy(){
        logger.info("destroy");
    }
   
}

第二步:在任务调度中心的任务管理界面添加任务并设置相关的配置执行任务的执行器名称

在这里插入图片描述

第三步:在调度中心的任务管理界面启动/停止添加的任务,然后通过调度日志查看执行器执行任务的情况,任务执行时间后注意清理日志

在这里插入图片描述

第四步:测试任务方法的执行,启动媒资管理模块的api工程(依赖service工程)启动执行器,等待执行器处理任务调度中心下发的任务,查看任务方法的执行情况

在这里插入图片描述

任务的高级配置

分布式任务处理就是启动多个执行器组成一个集群去执行任务,此时调度中心就需要考虑执行器在集群部署下执行任务的策略,保证任务高效执行且不重复

在这里插入图片描述

高级配置选项
路由策略FIRST: 固定选择第一个机器
LAST:固定选择最后一个机器
ROUND: 轮询
RANDOM: 随机选择在线的机器
CONSISTENT_HASH(一致性HASH): 按照Hash算法计算任务Id对应的Hash值,最终所有任务均匀散列在不同机器上
LEAST_FREQUENTLY_USED: 使用频率最低的机器优先被选举
LEAST_RECENTLY_USED: 最久未使用的机器优先被选举
FAILOVER(故障转移): 如果执行器集群中某一台机器故障,按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
BUSYOVER(忙碌转移): 如果执行器集群中某一台机器正在执行任务,按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
SHARDING_BROADCAST(分片广播): 广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务
子任务(对于关联的任务,实现一个任务执行完成去执行另一个任务)每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发并主动调度一次子任务ID所对应的任务
调度过期策略忽略: 调度过期后执行器会忽略过期的任务,从当前时间开始重新计算下次触发时间
立即执行一次: 调度过期后会立即执行一次,并从当前时间开始重新计算下次触发时间
阻塞处理策略(调度过于密集时执行器来不及处理时的策略)单机串行(默认): 调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行
丢弃后续调度: 调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止当前运行中的调度任务并清空队列,然后运行请求的调度任务;
任务超时时间(支持自定义)任务运行超时将会主动中断任务
失败重试次数(支持自定义)当任务失败时将会按照预设的失败重试次数主动进行重试

路由策略之分片广播测试

分片: 调度中心以执行器为维度进行分片,将集群中的每个执行器都标上序号

  • 分片任务场景: 10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍

广播: 每次调度会向集群中的所有执行器发送任务调度请求

  • 广播任务场景:让所有执行器同时运行shell脚本、集群所有节点进行缓存更新等
    在这里插入图片描述

每个执行器接收到调度请求的同时会接收分片参数(序号和总数),基于当前业务需求使用分片参数控制执行器是否执行任务,控制执行器集群分布式处理任务

  • 分片参数: 执行器可以根据分片参数动态的领取属于自己的任务,保证执行器直接不会执行重复的任务,充分发挥每个执行器的能力
  • xxl-job支持动态扩容执行器集群,当任务量增加时可以部署更多的执行器到集群中,此时调度中心会动态增加分片的数量

第一步:定义作业分片的任务方法获取分片参数,然后在任务调度中心添加对应的任务方法shardingJobHandler并设置路由策略为分片广播

在这里插入图片描述

java">// 分片广播任务
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
    // 分片序号,从0开始
    int shardIndex = XxlJobHelper.getShardIndex();
    // 分片总数   
    int shardTotal = XxlJobHelper.getShardTotal();
    log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
    log.info("开始执行第"+shardIndex+"批任务");
}

第二步:在Nacos的dev环境下media-service-dev.yaml配置文件中增加本地优先配置

# 本地优先
spring:
 cloud:
  config:
    override-none: true

第三步:启动媒资管理模块的接口工程并复制一份执行器实例,添加vm参数选项指定新实例的启动端口号执行器的启动端口号,执行器的名称不变

# Dserver.port=63051 -Dxxl.job.executor.port=9998对应如下配置项
server: 
  port: 53051
xxl:
  job:
    executor:
      port: 9998

在这里插入图片描述

第四步:观察任务调度中心,查看已经启动的执行器,调度中心会根据实际情况动态调整执行器的总分片数

在这里插入图片描述

第五步:在调度中心启动任务,观察执行器实例接收下发任务的执行情况,查看接收的分片序号和总分片数

在这里插入图片描述


http://www.niftyadmin.cn/n/5363083.html

相关文章

paramiko模块使用

安装: pip install paramikodemo: 连接linux,返回执行结果 import paramiko# 创建ssh对象 ssh = paramiko.SSHClient()# 连接方式 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())# 发起连接 ssh.connect(192.168.0.105

STC系列单片机定时器

目录 一、定时器的概念 二、单片机程序中的定时器功能代码的实现 &#xff08;1&#xff09;TMOD &#xff08;2&#xff09;AUXR &#xff08;3&#xff09;初始化TH0和TL0 &#xff08;4&#xff09;TR0/EA/ET0三个寄存器 一、定时器的概念 关于定时举个简单的例子&…

24.云原生ArgoCD高级之钩子

云原生专栏大纲 文章目录 Argo CD钩子如何定义钩子钩子删除策略 Argo CD钩子 Argo CD 是一个用于部署和管理 Kubernetes 应用程序的工具&#xff0c;它提供了一种声明式的方式来定义和自动化应用程序的部署过程。Argo CD 钩子&#xff08;Hooks&#xff09;是一种机制&#x…

QT 应用中集成 Sentry

QT 应用中集成 Sentry QT应用中集成 SentrySentry SDK for C/C注册 Sentry 账号QT 应用中集成 Sentry触发 Crash 上报 QT应用中集成 Sentry Sentry 是一个开源的错误监控和日志记录平台&#xff0c;旨在帮助开发团队实时捕获、跟踪和解决软件应用程序中的错误和异常。它提供了…

jupyter notebook更改工作目录的2个细节

详细步骤参考知乎原文&#xff1a; 如何更改Jupyter Notebook的默认工作路径&#xff1f; - 知乎 (zhihu.com​​​​​​) 步骤4中需要删除 #符号和后面的空格&#xff01;一定要删除空格&#xff0c;否则会出现语法错误的报错 步骤5中&#xff0c;经过评论区提醒后&#xf…

Map和Set的封装

目录 一、底层原理 二、红黑树的节点 三、仿函数 四、迭代器 4.1、迭代器的定义&#xff1a; 4.2、*:解引用操作 4.3、->:成员访问操作符 4.4、!、 4.5、迭代器的&#xff1a; 4.6、迭代器的-- 五、Map 六、Set 七、红黑树源码 一、底层原理 我们要知道&#…

大路灯护眼灯哪个牌子好学生用?适合学生使用的大路灯推荐

在这个数字化时代中&#xff0c;不仅我们的心灵在承受的压力&#xff0c;而且我们的眼睛也在随着科技的发展承受的前所未有的压力&#xff0c;作为一个自媒体创作者&#xff0c;面对长时间的工作和电子设备的运用&#xff0c;眼睛也经常会出现酸涩红血丝的状况&#xff0c;视力…

ctfshow web-77

开启环境: 先直接用伪协议获取 flag 位置。 c?><?php $anew DirectoryIterator("glob:///*"); foreach($a as $f) {echo($f->__toString(). );} exit(0); ?> 发现 flag36x.txt 文件。同时根目录下还有 readflag&#xff0c;估计需要调用 readflag 获…