SchedulerCenterServer.cs 18 KB


  1. using Quartz;
  2. using Quartz.Impl;
  3. using Quartz.Impl.Triggers;
  4. using Quartz.Spi;
  5. using Ropin.Inspection.Model.Common;
  6. using Ropin.Inspection.Model.Entities;
  7. using Ropin.Inspection.Model.ViewModel;
  8. using System;
  9. using System.Collections.Generic;
  10. using System.Collections.Specialized;
  11. using System.Linq;
  12. using System.Reflection;
  13. using System.Text;
  14. using System.Threading.Tasks;
  15. namespace Ropin.Inspection.Tasks.QuartzNet
  16. {
  17. /// <summary>
  18. /// 任务调度管理中心
  19. /// </summary>
  20. public class SchedulerCenterServer : ISchedulerCenter
  21. {
  22. private Task<IScheduler> _scheduler;
  23. private readonly IJobFactory _iocjobFactory;
  24. public SchedulerCenterServer(IJobFactory jobFactory)
  25. {
  26. _iocjobFactory = jobFactory;
  27. _scheduler = GetSchedulerAsync();
  28. }
  29. private Task<IScheduler> GetSchedulerAsync()
  30. {
  31. if (_scheduler != null)
  32. return this._scheduler;
  33. else
  34. {
  35. // 从Factory中获取Scheduler实例
  36. NameValueCollection collection = new NameValueCollection
  37. {
  38. { "quartz.serializer.type", "binary" },
  39. };
  40. StdSchedulerFactory factory = new StdSchedulerFactory(collection);
  41. return _scheduler = factory.GetScheduler();
  42. }
  43. }
  44. /// <summary>
  45. /// 开启任务调度
  46. /// </summary>
  47. /// <returns></returns>
  48. public async Task<MessageModel<string>> StartScheduleAsync()
  49. {
  50. var result = new MessageModel<string>();
  51. try
  52. {
  53. this._scheduler.Result.JobFactory = this._iocjobFactory;
  54. if (!this._scheduler.Result.IsStarted)
  55. {
  56. //等待任务运行完成
  57. await this._scheduler.Result.Start();
  58. await Console.Out.WriteLineAsync("任务调度开启!");
  59. result.success = true;
  60. result.msg = $"任务调度开启成功";
  61. return result;
  62. }
  63. else
  64. {
  65. result.success = false;
  66. result.msg = $"任务调度已经开启";
  67. return result;
  68. }
  69. }
  70. catch (Exception)
  71. {
  72. throw;
  73. }
  74. }
  75. /// <summary>
  76. /// 停止任务调度
  77. /// </summary>
  78. /// <returns></returns>
  79. public async Task<MessageModel<string>> StopScheduleAsync()
  80. {
  81. var result = new MessageModel<string>();
  82. try
  83. {
  84. if (!this._scheduler.Result.IsShutdown)
  85. {
  86. //等待任务运行完成
  87. await this._scheduler.Result.Shutdown();
  88. await Console.Out.WriteLineAsync("任务调度停止!");
  89. result.success = true;
  90. result.msg = $"任务调度停止成功";
  91. return result;
  92. }
  93. else
  94. {
  95. result.success = false;
  96. result.msg = $"任务调度已经停止";
  97. return result;
  98. }
  99. }
  100. catch (Exception)
  101. {
  102. throw;
  103. }
  104. }
  105. /// <summary>
  106. /// 添加一个计划任务(映射程序集指定IJob实现类)
  107. /// </summary>
  108. /// <typeparam name="T"></typeparam>
  109. /// <param name="tasksQz"></param>
  110. /// <returns></returns>
  111. public async Task<MessageModel<string>> AddScheduleJobAsync(TasksQz tasksQz)
  112. {
  113. var result = new MessageModel<string>();
  114. if (tasksQz != null)
  115. {
  116. try
  117. {
  118. JobKey jobKey = new JobKey(tasksQz.Id.ToString(), tasksQz.JobGroup);
  119. if (await _scheduler.Result.CheckExists(jobKey))
  120. {
  121. result.success = false;
  122. result.msg = $"该任务计划已经在执行:【{tasksQz.Name}】,请勿重复启动!";
  123. return result;
  124. }
  125. #region 设置开始时间和结束时间
  126. if (tasksQz.BeginTime == null)
  127. {
  128. tasksQz.BeginTime = DateTime.Now;
  129. }
  130. DateTimeOffset starRunTime = DateBuilder.NextGivenSecondDate(tasksQz.BeginTime, 1);//设置开始时间
  131. if (tasksQz.EndTime == null)
  132. {
  133. tasksQz.EndTime = DateTime.MaxValue.AddDays(-1);
  134. }
  135. DateTimeOffset endRunTime = DateBuilder.NextGivenSecondDate(tasksQz.EndTime, 1);//设置暂停时间
  136. #endregion
  137. #region 通过反射获取程序集类型和类
  138. Assembly assembly = Assembly.Load(new AssemblyName(tasksQz.AssemblyName));
  139. Type jobType = assembly.GetType(tasksQz.AssemblyName + ".QuartzNet.Jobs." + tasksQz.ClassName);
  140. #endregion
  141. //判断任务调度是否开启
  142. if (!_scheduler.Result.IsStarted)
  143. {
  144. await StartScheduleAsync();
  145. }
  146. //传入反射出来的执行程序集
  147. IJobDetail job = new JobDetailImpl(tasksQz.Id.ToString(), tasksQz.JobGroup, jobType);
  148. job.JobDataMap.Add("JobParam", tasksQz.C_StoreCode); //job.JobDataMap.Add("JobParam", tasksQz.JobParams);
  149. ITrigger trigger;
  150. #region 泛型传递
  151. //IJobDetail job = JobBuilder.Create<T>()
  152. // .WithIdentity(sysSchedule.Name, sysSchedule.JobGroup)
  153. // .Build();
  154. #endregion
  155. if (tasksQz.Cron != null && CronExpression.IsValidExpression(tasksQz.Cron) && tasksQz.TriggerType > 0)
  156. {
  157. trigger = CreateCronTrigger(tasksQz);
  158. ((CronTriggerImpl)trigger).MisfireInstruction = MisfireInstruction.CronTrigger.DoNothing;
  159. }
  160. else
  161. {
  162. trigger = CreateSimpleTrigger(tasksQz);
  163. }
  164. // 告诉Quartz使用我们的触发器来安排作业
  165. await _scheduler.Result.ScheduleJob(job, trigger);
  166. //await Task.Delay(TimeSpan.FromSeconds(120));
  167. //await Console.Out.WriteLineAsync("关闭了调度器!");
  168. //await _scheduler.Result.Shutdown();
  169. result.success = true;
  170. result.msg = $"【{tasksQz.Name}】成功";
  171. return result;
  172. }
  173. catch (Exception ex)
  174. {
  175. result.success = false;
  176. result.msg = $"任务计划异常:【{ex.Message}】";
  177. return result;
  178. }
  179. }
  180. else
  181. {
  182. result.success = false;
  183. result.msg = $"任务计划不存在:【{tasksQz?.Name}】";
  184. return result;
  185. }
  186. }
  187. /// <summary>
  188. /// 任务是否存在?
  189. /// </summary>
  190. /// <returns></returns>
  191. public async Task<bool> IsExistScheduleJobAsync(TasksQz sysSchedule)
  192. {
  193. JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
  194. if (await _scheduler.Result.CheckExists(jobKey))
  195. {
  196. return true;
  197. }
  198. else
  199. {
  200. return false;
  201. }
  202. }
  203. /// <summary>
  204. /// 暂停一个指定的计划任务
  205. /// </summary>
  206. /// <returns></returns>
  207. public async Task<MessageModel<string>> StopScheduleJobAsync(TasksQz sysSchedule)
  208. {
  209. var result = new MessageModel<string>();
  210. try
  211. {
  212. JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
  213. if (!await _scheduler.Result.CheckExists(jobKey))
  214. {
  215. result.success = false;
  216. result.msg = $"未找到要暂停的任务:【{sysSchedule.Name}】";
  217. return result;
  218. }
  219. else
  220. {
  221. await this._scheduler.Result.DeleteJob(jobKey);
  222. result.success = true;
  223. result.msg = $"【{sysSchedule.Name}】成功";
  224. return result;
  225. }
  226. }
  227. catch (Exception)
  228. {
  229. throw;
  230. }
  231. }
  232. /// <summary>
  233. /// 恢复指定的计划任务
  234. /// </summary>
  235. /// <param name="sysSchedule"></param>
  236. /// <returns></returns>
  237. public async Task<MessageModel<string>> ResumeJob(TasksQz sysSchedule)
  238. {
  239. var result = new MessageModel<string>();
  240. try
  241. {
  242. JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
  243. if (!await _scheduler.Result.CheckExists(jobKey))
  244. {
  245. result.success = false;
  246. result.msg = $"未找到要恢复的任务:【{sysSchedule.Name}】";
  247. return result;
  248. }
  249. await this._scheduler.Result.ResumeJob(jobKey);
  250. result.success = true;
  251. result.msg = $"【{sysSchedule.Name}】成功";
  252. return result;
  253. }
  254. catch (Exception)
  255. {
  256. throw;
  257. }
  258. }
  259. /// <summary>
  260. /// 暂停指定的计划任务
  261. /// </summary>
  262. /// <param name="sysSchedule"></param>
  263. /// <returns></returns>
  264. public async Task<MessageModel<string>> PauseJob(TasksQz sysSchedule)
  265. {
  266. var result = new MessageModel<string>();
  267. try
  268. {
  269. JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
  270. if (!await _scheduler.Result.CheckExists(jobKey))
  271. {
  272. result.success = false;
  273. result.msg = $"未找到要暂停的任务:【{sysSchedule.Name}】";
  274. return result;
  275. }
  276. await this._scheduler.Result.PauseJob(jobKey);
  277. result.success = true;
  278. result.msg = $"【{sysSchedule.Name}】成功";
  279. return result;
  280. }
  281. catch (Exception)
  282. {
  283. throw;
  284. }
  285. }
  286. #region 状态状态帮助方法
  287. public async Task<List<TaskInfoDto>> GetTaskStaus(TasksQz sysSchedule)
  288. {
  289. var ls = new List<TaskInfoDto>();
  290. var noTask = new List<TaskInfoDto>{ new TaskInfoDto {
  291. jobId = sysSchedule.Id.ToString(),
  292. jobGroup = sysSchedule.JobGroup,
  293. triggerId = "",
  294. triggerGroup = "",
  295. triggerStatus = "不存在"
  296. } };
  297. JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
  298. IJobDetail job = await this._scheduler.Result.GetJobDetail(jobKey);
  299. if (job == null)
  300. {
  301. return noTask;
  302. }
  303. //info.Append(string.Format("任务ID:{0}\r\n任务名称:{1}\r\n", job.Key.Name, job.Description));
  304. var triggers = await this._scheduler.Result.GetTriggersOfJob(jobKey);
  305. if (triggers == null || triggers.Count == 0)
  306. {
  307. return noTask;
  308. }
  309. foreach (var trigger in triggers)
  310. {
  311. var triggerStaus = await this._scheduler.Result.GetTriggerState(trigger.Key);
  312. string state = GetTriggerState(triggerStaus.ToString());
  313. ls.Add(new TaskInfoDto
  314. {
  315. jobId = job.Key.Name,
  316. jobGroup = job.Key.Group,
  317. triggerId = trigger.Key.Name,
  318. triggerGroup = trigger.Key.Group,
  319. triggerStatus = state
  320. });
  321. //info.Append(string.Format("触发器ID:{0}\r\n触发器名称:{1}\r\n状态:{2}\r\n", item.Key.Name, item.Description, state));
  322. }
  323. return ls;
  324. }
  325. public string GetTriggerState(string key)
  326. {
  327. string state = null;
  328. if (key != null)
  329. key = key.ToUpper();
  330. switch (key)
  331. {
  332. case "1":
  333. state = "暂停";
  334. break;
  335. case "2":
  336. state = "完成";
  337. break;
  338. case "3":
  339. state = "出错";
  340. break;
  341. case "4":
  342. state = "阻塞";
  343. break;
  344. case "0":
  345. state = "正常";
  346. break;
  347. case "-1":
  348. state = "不存在";
  349. break;
  350. case "BLOCKED":
  351. state = "阻塞";
  352. break;
  353. case "COMPLETE":
  354. state = "完成";
  355. break;
  356. case "ERROR":
  357. state = "出错";
  358. break;
  359. case "NONE":
  360. state = "不存在";
  361. break;
  362. case "NORMAL":
  363. state = "正常";
  364. break;
  365. case "PAUSED":
  366. state = "暂停";
  367. break;
  368. }
  369. return state;
  370. }
  371. #endregion
  372. #region 创建触发器帮助方法
  373. /// <summary>
  374. /// 创建SimpleTrigger触发器(简单触发器)
  375. /// </summary>
  376. /// <param name="sysSchedule"></param>
  377. /// <param name="starRunTime"></param>
  378. /// <param name="endRunTime"></param>
  379. /// <returns></returns>
  380. private ITrigger CreateSimpleTrigger(TasksQz sysSchedule)
  381. {
  382. if (sysSchedule.CycleRunTimes > 0)
  383. {
  384. ITrigger trigger = TriggerBuilder.Create()
  385. .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
  386. .StartAt(sysSchedule.BeginTime.Value)
  387. .WithSimpleSchedule(x => x
  388. .WithIntervalInSeconds(sysSchedule.IntervalSecond)
  389. .WithRepeatCount(sysSchedule.CycleRunTimes - 1))
  390. .EndAt(sysSchedule.EndTime.Value)
  391. .Build();
  392. return trigger;
  393. }
  394. else
  395. {
  396. ITrigger trigger = TriggerBuilder.Create()
  397. .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
  398. .StartAt(sysSchedule.BeginTime.Value)
  399. .WithSimpleSchedule(x => x
  400. .WithIntervalInSeconds(sysSchedule.IntervalSecond)
  401. .RepeatForever()
  402. )
  403. .EndAt(sysSchedule.EndTime.Value)
  404. .Build();
  405. return trigger;
  406. }
  407. // 触发作业立即运行,然后每10秒重复一次,无限循环
  408. }
  409. /// <summary>
  410. /// 创建类型Cron的触发器
  411. /// </summary>
  412. /// <param name="m"></param>
  413. /// <returns></returns>
  414. private ITrigger CreateCronTrigger(TasksQz sysSchedule)
  415. {
  416. // 作业触发器
  417. return TriggerBuilder.Create()
  418. .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
  419. .StartAt(sysSchedule.BeginTime.Value)//开始时间
  420. .EndAt(sysSchedule.EndTime.Value)//结束数据
  421. .WithCronSchedule(sysSchedule.Cron)//指定cron表达式
  422. .ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup)//作业名称
  423. .Build();
  424. }
  425. #endregion
  426. /// <summary>
  427. /// 立即执行 一个任务
  428. /// </summary>
  429. /// <param name="tasksQz"></param>
  430. /// <returns></returns>
  431. public async Task<MessageModel<string>> ExecuteJobAsync(TasksQz tasksQz)
  432. {
  433. var result = new MessageModel<string>();
  434. try
  435. {
  436. JobKey jobKey = new JobKey(tasksQz.Id.ToString(), tasksQz.JobGroup);
  437. //判断任务是否存在,存在则 触发一次,不存在则先添加一个任务,触发以后再 停止任务
  438. if (!await _scheduler.Result.CheckExists(jobKey))
  439. {
  440. //不存在 则 添加一个计划任务
  441. await AddScheduleJobAsync(tasksQz);
  442. //触发执行一次
  443. await _scheduler.Result.TriggerJob(jobKey);
  444. //停止任务
  445. await StopScheduleJobAsync(tasksQz);
  446. result.success = true;
  447. result.msg = $"立即执行计划任务:【{tasksQz.Name}】成功";
  448. }
  449. else
  450. {
  451. await _scheduler.Result.TriggerJob(jobKey);
  452. result.success = true;
  453. result.msg = $"立即执行计划任务:【{tasksQz.Name}】成功";
  454. }
  455. }
  456. catch (Exception ex)
  457. {
  458. result.msg = $"立即执行计划任务失败:【{ex.Message}】";
  459. }
  460. return result;
  461. }
  462. }
  463. }