抱歉,您的浏览器不支持访问该网站,建议更换更新版本的浏览器,例如Chrome/Firefox/Safari等
首页
分享
登录
分享
登录
关于Xnote
回到
顶部
文档查看
分享链接:
复制
根目录
/
基于mde-job的异步任务框架介绍
分享中
标签
待办
文档
公开
# 简介 本异步任务框架主要包含两部分,任务池 mde-job 和异步任务管理器。主要部分是 mde-job,任务管理器是在它基础上的一层应用封装。可以简化业务开发的流程,正常如果需要开发一个高RT带进度反馈的接口需要设计一张任务表,编写对应的消息发送和消费代码,以及暴露给前端的API接口。这样一个框架把这些工作都聚合到了一个统一流程之下,业务开发需要关注的就是指定任务的唯一标志,预定的执行时间以及具体的功能实现。 项目地址: https://github.com/xupingmao/mde-job # 使用场景 本框架并不是提供通用的任务调度系统,而是提供类似于程序库(library)的功能。它要解决的问题不是调度问题,而是通过异步的方式来执行耗时比较长的程序,提供给前台播报进度的功能。 典型的场景 - boss系统需要执行一个批量处理操作,如果提供普通的接口,程序会超时,而且无法查询任务进度,通过异步的方式可以通过前台轮询来问询进度 # taskpool介绍 taskpool是一套基于MySQL的任务池模型,这一部分原本是可以独立成一个应用的,考虑到成本问题,目前暂时和im应用放在一起。 taskpool包含四个过程,创建任务、获取任务、执行任务、提交结果 - 创建任务:创建任务时需要指定任务的类型和超时时间,如果有必要,可以自行指定任务的ID或者延迟执行时间,任务创建成功之后,taskpool会把可用时间置为当前时间加上延迟的时间,如果延迟时间不设置,默认立即可用。 - 获取任务:任务分配过程使用抢占式,通过MySQL的条件更新,一个消费者在获取到任务之后同时更新任务的持有者holder和可用时间avail\_time,其他消费者在任务超时之前无法再获取 - 执行任务:消费者处理自己的业务逻辑 - 提交结果:任务执行完毕之后通过commit方法提交任务,taskpool会检查任务的持有者是否是该提交者,如果是,任务提交成功,结果写回到任务模型中,否则任务提交失败,抛出异常通知消费者回滚对应操作。 # 任务管理器 任务管理器依赖taskpool和消息中间件,下面是它工作原理: ![taskpool-sequence.png](/data/files/2017/12/1517_taskpool-sequence.png) # 使用方法 要使用异步任务框架,需要做的事情并不多,主要是以下三点: - 配置taskManager的dubbo消费者 - 配置消息中间件注册器的SpringBean(backend子项目都已经配置) - 实现BaseTaskHandler的抽象方法 ## 配置dubbo消费者 在```*dubbo-consumer.xml```文件中加入以下配置即可 ``` <dubbo:reference id="taskManager" interface="com.dzj.im.client.task.TaskManager" group="${dubbo.group}"/> ``` ## 实现BaseTaskHandler BaseTaskHandler有三个抽象方法,分别是 - getTaskType 任务类型,同时也是默认的消息messageType - getTimeoutMillis 超时的毫秒数 - handle 处理业务逻辑方法 下面是一个简单的例子,位于tob的biz模块下面 ```java @Component public class TobExampleTask extends BaseTaskHandler { @Autowired private DataAccessProxyAware[] dataAccessProxyAwares; @Override public String getTaskType() { return "tobExampleTask"; } @Override public long getTimeoutMillis() { return 600*1000; } @Override public Object handle(TaskResultDTO resultDTO, Map<String, Object> params) throws Exception { List<String> repositories = Lists.newArrayList(); if (dataAccessProxyAwares != null) { for (int i = 0; i < dataAccessProxyAwares.length; i++) { DataAccessProxyAware accessProxyAware = dataAccessProxyAwares[i]; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } repositories.add(accessProxyAware.getClass().getSimpleName()); resultDTO.setResult(String.format("%d/%d", i+1, dataAccessProxyAwares.length)); // 更新任务进度 taskManager.update(resultDTO); } } return repositories; } } ``` ## Rest接口 - POST /im/task/trigger/{taskType} 触发某个类型的任务 ``` POST /im/task/trigger/tobExampleTask { } { "code": null, "message": null, "data": { "id": 16, "holder": "2e614d51-0d65-4567-80d5-fe0355ac4f3e", "params": "{}", "result": null } } ``` - GET /im/task/progress/{id} 查询任务进度 ``` GET /im/task/progress/16 // 执行过程中返回 { "code": null, "message": null, "data": { "id": 19, "holder": "9e537fe3-9577-411f-8491-e1e637ff2d4a", "status": "IN_PROGRESS", "result": "6/36" } } // 执行结束 { "code": null, "message": null, "data": { "id": 16, "holder": "2e614d51-0d65-4567-80d5-fe0355ac4f3e", "status": "FINISHED", "result": "[\"DefaultObjectRepository\",\"JpaCompanyServiceApplicationRepository\",\"JpaCompanyNewsRepository\",\"JpaVideoStrategyRepository\",\"JpaCompanyHospitalRelationRepository\",\"JpaCreditChangeLogRepository\",\"JpaEducationBaseRepository\",\"JpaSalesOrderFeedbackRepository\",\"JpaBillRepository\",\"JpaCreditExchangeRuleModifyRecordRepository\",\"JpaCompanyServiceRepository\",\"JpaCompanyDiseaseRepository\",\"JpaResourceLibraryRepository\",\"JpaMedicalFollowUpAggregationRepository\",\"JpaLecturerInfoRepository\",\"JpaCompanyProductRepository\",\"JpaPurchaseOrderChangeLogRepository\",\"JpaCompanyCooperationApplicationRepository\",\"JpaCompanyAccountRepository\",\"JpaCompanyMemberRoleChangeRecordRepository\",\"JpaSalesOrderLineRepository\",\"JpaCompanyMemberRoleRepository\",\"JpaMedicalPopularityProductRepository\",\"JpaSalesOrderChangeLogRepository\",\"JpaCompanyAccountChangeDetailRepository\",\"JpaCompanyRepository\",\"JpaCreditExchangeActionRecordRepository\",\"JpaCreditExchangePointAggregationRepository\",\"JpaBranchCenterBannerRepository\",\"JpaMedicineProductRelationRepository\",\"JpaMedicalPopularityRegionCityRelationRepository\",\"JpaSalesOrderRepository\",\"JpaCreditExchangeRuleRepository\",\"JpaCompanyPointStrategyRepository\",\"JpaPurchaseOrderRepository\",\"JpaLaunchStrategyRepository\"]" } } ``` - POST /im/task/cancel/{id} 取消任务,如果任务已经执行完成会取消失败 ``` POST /im/task/cancel/17 { "code": null, "message": null, "data": true } ``` # 事务的回滚 BaseTaskHandler内部有一个handleInTransaction方法添加了spring的@Transactional注解,如果没有使用spring托管事务需要重写handleInTransaction方法自己回滚事务。 # 任务的重试 任务有两种失败情况 - 执行器执行过程中抛出异常,也就是执行本身的逻辑或数据问题 - 任务执行超时,提交结果的时候会失败 针对这两种情况,框架会在提交和查询结果过程中检查失败,如果检测到失败的任务会自动触发执行,如果任务需要取消,需要调用取消接口。 # TODO 任务注册信息的刷新 目前采用的是redis持久化所有数据,一旦注册不会失效,这样可能出现某个处理器下线之后依然能够成功触发任务的情况,针对这个情况,目前的做法是手动删除。 简单起见可以先实现一个管理的API,提供注册信息查询和下线的功能,通过API来手动下线相关的处理器。 - 注册信息应包含具体的机器 - 应用关闭时删除注册信息 - 主动维护注册信息,需要引入相应的心跳检查策略。 # TODO 权限控制 注册处理器的时候指定执行权限 # TODO 任务的拆分 把一个大的任务拆分成若干子任务,类似于map-reduce,提供一个map函数。taskpool作为一个库嵌入到系统中,然后同一个应用监听该task的变化 - Mapper - Reducer
admin 创建于 2017-12-14 更新于 2024-02-14
加载中...
应用列表