几个月前有这么个需求:需要执行一些Job,这些Job会各自按照不同的时间频次执行,且它们做的事情也不同,有的是监控站点,有的是监控服务器存储情况,有的是监控报表PROCEDURE的执行状况…
OK,当看到这么个需求,首先我想肯定是要用多线程来处理了,并且还要考虑到性能问题,领导当时给出一个方案,建议看一下Quartz.net,于是,研究了几天,看了不少资料,总算把监控引擎做出了一个版本,今天有时间,来整理一下大致过程,记录一下代码。
整个监控引擎,简单来说,一是使用Quartz.Net实现多线程执行任务,二是使用Topshelf构建Windows Server
一:Quartz.Net,官方:https://www.quartz-scheduler.net/
二:TopShelf,请自行度娘吧
工具:VS2019
首先创建一个控制台应用程序
然后NuGet引用Topshelf.dll程序集
添加控制台代码如下:
class Program{static void Main(string[] args){var rc = HostFactory.Run(x =>{x.Service<TownCrier>(s =>{s.ConstructUsing(name => new TownCrier());s.WhenStarted(tc => tc.Start());s.WhenStopped(tc => tc.Stop());});x.RunAsLocalSystem();x.SetDescription("服務说明");x.SetDisplayName("DisplayName自定义");x.SetServiceName("ServiceName自定义");});var exitCode = (int)Convert.ChangeType(rc, rc.GetTypeCode());Environment.ExitCode = exitCode;}}public class TownCrier{public void Start(){//开始执行}public void Stop(){//停止执行}}
以上是程序的入口,接下来才真正涉及到如何开展多线程任务,如何使用Quartz.net
先创建一个类库QuartzBLL,并安装Quartz引用:
这里直接提供工具类QuartzManage,看了很多网友的例子,但都没调通,然后自己摸索着整理出来一个工具类,或有不足之处,请指出,多谢!
public class QuartzManage{static Task<IScheduler> task_scheduler = StdSchedulerFactory.GetDefaultScheduler();static IScheduler scheduler;private static readonly object objlock = new object();static QuartzManage(){if (scheduler == null){lock (objlock){if (scheduler == null){scheduler = task_scheduler.Result;scheduler.Start();}}}}/// <summary>/// 以Simple开始一个工作/// </summary>/// <typeparam name="T"></typeparam>/// <param name="name"></param>/// <param name="simpleInterval"></param>public static void StartJobWithSimple<T>(string name, Action<SimpleScheduleBuilder> simpleInterval) where T : IJob{IJobDetail job = JobBuilder.Create<T>().WithIdentity(name + "_job", name + "_group").Build();ITrigger Simple = TriggerBuilder.Create().StartNow().WithSimpleSchedule(simpleInterval).Build() as ISimpleTrigger;scheduler.ScheduleJob(job, Simple);}/// <summary>/// 以Cron开始一个工作/// </summary>/// <typeparam name="T"></typeparam>/// <param name="name"></param>/// <param name="CronExpression"></param>public static void StartJobWithCron<T>(string name, string CronExpression) where T : IJob{IJobDetail job = JobBuilder.Create<T>().WithIdentity(name + "_job", name + "_group").Build();ITrigger CronTrigger = TriggerBuilder.Create().StartNow().WithIdentity(name + "_trigger", name + "_group").WithCronSchedule(CronExpression, w => w.WithMisfireHandlingInstructionDoNothing()).Build() as ICronTrigger;scheduler.ScheduleJob(job, CronTrigger);}/// <summary>/// 這種辦法可以根據job名稱找到觸發器,也可以找到Job,這樣就可以在任何地方修改Job頻次,不再限於IJob的實現方法Execute內/// 以此實現了:即時修改執行頻次即時生效/// </summary>/// <param name="cronExpression"></param>/// <param name="name"></param>public static async void ModifyJob(string cronExpression, string name){//触发器的keyTriggerKey triggerKey = new TriggerKey(name + "_trigger", name + "_group");ITrigger trigger = await scheduler.GetTrigger(triggerKey);CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.CronSchedule(cronExpression);trigger = trigger.GetTriggerBuilder().WithIdentity(triggerKey).WithSchedule(scheduleBuilder).Build();await scheduler.RescheduleJob(triggerKey, trigger);}/// <summary>/// 删除Job,即時刪除,不再限於IJob的實現方法Execute內/// </summary>/// <param name="job"></param>/// <param name="trigger"></param>public static void DeleteJob(string name){//触发器的keyTriggerKey triggerKey = new TriggerKey(name + "_trigger", name + "_group");//Job的KeyJobKey jobKey = new JobKey(name + "_job", name + "_group");scheduler.PauseTrigger(triggerKey);//暫停觸發器scheduler.UnscheduleJob(triggerKey);//移除觸發器scheduler.DeleteJob(jobKey);}/// <summary>/// 删除Job/// </summary>/// <param name="job"></param>/// <param name="trigger"></param>public static void DeleteJob(IJobDetail job, ITrigger trigger){scheduler.PauseTrigger(trigger.Key);//暫停觸發器scheduler.UnscheduleJob(trigger.Key);//移除觸發器scheduler.DeleteJob(job.Key);}/// <summary>/// 停止運行/// </summary>public static void ShutDownJob(){if (scheduler != null && !scheduler.IsShutdown){scheduler.Shutdown();}}}
还有一种方式,是可以自定义调度器的属性:比如线程池数量threadCount…默认的数量好像是10吧,记不清楚了,也有人把配置文件放在xml文件中,程序里读取配置文件来创建调度器。具体怎么玩,视情况而定吧。
public class QuartzHelper{private static IScheduler scheduler = null;static QuartzHelper(){var properties = new NameValueCollection();// 设置线程池properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";//设置线程池的最大並發线程数量properties["quartz.threadPool.threadCount"] = "5";//设置作业中每个线程的优先级properties["quartz.threadPool.threadPriority"] = ThreadPriority.Normal.ToString();远程输出配置//properties["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz";//properties["quartz.scheduler.exporter.port"] = "555"; //配置端口号//properties["quartz.scheduler.exporter.bindName"] = "QuartzScheduler";//properties["quartz.scheduler.exporter.channelType"] = "tcp"; //协议类型if (scheduler == null){//创建一个工厂var schedulerFactory = new StdSchedulerFactory(properties);//启动scheduler = schedulerFactory.GetScheduler().Result;//1、开启调度scheduler.Start();}}}
添加测试类
拿TestJob1举例,旧版的Quartz好像在实现IJob接口的时候,还不是异步方法,最新版本是异步,注意async await:
class TestJob1 : IJob{public async Task Execute(IJobExecutionContext context){await Task.Run(() =>{Console.WriteLine($"第一个Job执行了,Time:{DateTime.Now}"); });}}class TestJob2 : IJob{public async Task Execute(IJobExecutionContext context){await Task.Run(() =>{Console.WriteLine($"第二个Job执行了,Time:{DateTime.Now}"); });}}
然后定义一个JobStart类,作为程序业务入口类,将TestJob1和TestJob2注册在Quartz中,调度器会按照各自的频次(CronExpression)执行Job,可以这么理解:一个Job对应一个线程,如果所有Job的CronExpression一样,Job就会在同一时间同时执行,此时如果线程池中线程数量不够同时处理这些Job,则会排序等待,就像消息队列一样(线程池线程数量可以自定义)
public class JobStart{public void Start(){QuartzManage.StartJobWithCron<TestJob1>("TestJob1", "0/5 * * * * ?");//5秒钟一次QuartzManage.StartJobWithCron<TestJob2>("TestJob2", "0/8 * * * * ?");//8秒钟一次Console.ReadLine();}}
最后,在Program,TownCrier的Start方法中启动程序。
看一下效果
OK,继续吧。
此处仅示例展示,如果是配置在数据库中的Job,需要动态加载?
我的做法是建一个工厂类JobFactory,來根據不同的任务设定的Type,來返回不同的TestJob类(如果Job类执行的任务大致相同,比如:你的系统中有很多执行计划的任务,其实就是不同的sql或者procedure,那么不需要建很多的TestJob类,只需要一个类即可)。
然後自定义一个接口(IJobMonitor),声明两个方法,StartJob和DeleteJob。
将TestJob类实现该接口IJobMonitor。
也可以单独建立一个类库文件,通过反射创建Job,当然要视情况而定,需不需要用反射?
/// <summary>/// 通過反射加載/// </summary>/// <param name="category"></param>public static void CreatJob(Config_DataVerifyCategory category){//加载程序及路径Assembly ass = Assembly.LoadFrom(@"D:\xx\TestJobAssembly.dll");if (ass != null){Type[] types = ass.GetExportedTypes();foreach (Type t in types){if (typeof(IJob).IsAssignableFrom(t)){IJobDetail jobDetail = JobBuilder.Create(t).WithIdentity(category.Code + "_CaseHandingJob", category.Code + "_CaseHandingJobGroup").Build();jobDetail.JobDataMap.Put("cronExpression", category.CronExpression);jobDetail.JobDataMap.Put("categoryCode", category.Code);//新建一个触发器ITrigger trigger = TriggerBuilder.Create().StartNow().WithCronSchedule(category.CronExpression).Build();//任务和触发器关联放入调度器scheduler.ScheduleJob(jobDetail, trigger);scheduler.Start();break;}}}}
其实,我实际的项目要复杂很多,业务在不断的变化着,代码也改了很多次,深层的讲解我就不做了(主要我研究的也不深入),我想各位应该看的明白,截图看下吧
目前我的项目中,已经扩展到两部分,一部分是监控引擎,一部分是数据监控,所谓数据监控,是很多很多的方案,每个方案对应几个Task,每个task就是sql或者mdx等查询数据的脚本,还好它们可以用一个Job类实现IJob去处理,而监控引擎则比较复杂,每个Job都做着各自不同的事情,所以,我为每个Job创建Job类,让它们职责单一,如果以后扩展,就新建一个Job类,然后改一下工厂类即可,这样也符合开闭原则。
我给程序设置半小时一次的定时器,查询数据库是否有新增或禁用、修改的Job,每次和静态缓存的对象对比,以实现增删改Job之后,动态Update新的Job,该新增就新增,该禁用就禁用,如果大家有更好的思路,请一定要分享给我喔,谢谢!
再展示一下上端代码:
public class MonitorStartEngine{private static List<Config_Frequency> cacheList;public void Start(){LogHelper.Info("----------------監控引擎windows服務定時執行----------------");try{//在此查詢配置項List<Config_Frequency> queryList = MonitorFrequencyDAL.GetAllFrequencyList();if (queryList != null){queryList.ForEach(action =>{Config_Frequency msf = GetConfig_Frequency(action.MonitorCategory);IJobMonitor monitor = MonitorJobFactory.GetMonitor(action.MonitorCategory);if (monitor != null) monitor.StartJob(msf, action);});}if (cacheList != null){//刪除已禁用的Jobvar deletedJobs = ListCompare(cacheList, queryList);deletedJobs.ForEach(action =>{Config_Frequency msf = GetConfig_Frequency(action.MonitorCategory);IJobMonitor monitor = MonitorJobFactory.GetMonitor(action.MonitorCategory);if (monitor != null) monitor.DeleteJob();});}cacheList = queryList;}catch (Exception ex){LogHelper.Error("監控引擎異常信息: " + ex.Message);QuartzManage.ShutDownJob();}}private Config_Frequency GetConfig_Frequency(string monitorCatogory){if (cacheList != null)return cacheList.Where(c => c.MonitorCategory == monitorCatogory).FirstOrDefault();elsereturn null;}//取出存在cacheList中但不存在queryList中的数据,差异数据放入deletedListprivate List<Config_Frequency> ListCompare(List<Config_Frequency> cacheList, List<Config_Frequency> queryList){List<Config_Frequency> deletedList = new List<Config_Frequency>();if (queryList == null)deletedList = cacheList;else{foreach (Config_Frequency cf in cacheList){var mfs = queryList.Where(c => c.MonitorCategory == cf.MonitorCategory).FirstOrDefault();if (mfs == null){deletedList.Add(cf);}}}return deletedList;}}
最后说一下,如何创建Windows Server:
控制台应用程序发布后,使用管理員身份打開Dos命令:
找到程序exe文件所在目录,比如我本机测试发布后的目录在D:\zPublish\Job
安裝:服務.exe install
卸载:服務.exe uninstall
如圖所示: