C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task

By | 2020年6月6日

    为什么编写TaskSchedulerEx类?

    因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。

     特点:

    1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount

    2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker

    3、队列中尚未执行的任务可以取消

    4、通过扩展类TaskHelper实现任务分组

    5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程

    6、代码量相当精简,TaskSchedulerEx类只有260多行代码

    7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = CPU核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能

     对比SmartThreadPool:

    TaskSchedulerEx类代码:


using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// TaskScheduler扩展
    /// 每个实例都是独立线程池
    /// </summary>
    public class TaskSchedulerEx : TaskScheduler, IDisposable
    {
        #region 外部方法
        [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")]
        public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize);
        #endregion

        #region 变量属性事件
        private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
        List<Thread> _coreThreadList = new List<Thread>();
        private int _coreThreadCount = 0;
        private int _maxThreadCount = 0;
        private int _auxiliaryThreadTimeOut = 20000; //辅助线程释放时间
        private int _activeThreadCount = 0;
        private System.Timers.Timer _timer;
        private object _lockCreateTimer = new object();
        private bool _run = true;

        /// <summary>
        /// 活跃线程数
        /// </summary>
        public int ActiveThreadCount
        {
            get { return _activeThreadCount; }
        }

        /// <summary>
        /// 核心线程数
        /// </summary>
        public int CoreThreadCount
        {
            get { return _coreThreadCount; }
        }

        /// <summary>
        /// 最大线程数
        /// </summary>
        public int MaxThreadCount
        {
            get { return _maxThreadCount; }
        }
        #endregion

        #region 构造函数
        /// <summary>
        /// TaskScheduler扩展
        /// 每个实例都是独立线程池
        /// </summary>
        /// <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param>
        /// <param name="maxThreadCount">最大线程数</param>
        public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20)
        {
            _maxThreadCount = maxThreadCount;
            CreateCoreThreads(coreThreadCount);
        }
        #endregion

        #region override GetScheduledTasks
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks;
        }
        #endregion

        #region override TryExecuteTaskInline
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
        #endregion

        #region override QueueTask
        protected override void QueueTask(Task task)
        {
            CreateTimer();
            _tasks.Enqueue(task);
        }
        #endregion

        #region 资源释放
        /// <summary>
        /// 资源释放
        /// 如果尚有任务在执行,则会在调用此方法的线程上引发System.Threading.ThreadAbortException,请使用Task.WaitAll等待任务执行完毕后,再调用该方法
        /// </summary>
        public void Dispose()
        {
            _run = false;

            if (_timer != null)
            {
                _timer.Stop();
                _timer.Dispose();
                _timer = null;
            }

            foreach (Thread item in _coreThreadList)
            {
                item.Abort();
                Interlocked.Decrement(ref _activeThreadCount);
            }
            _coreThreadList.Clear();

            GC.Collect();
            GC.WaitForPendingFinalizers();
            if (Environment.OSVersion.Platform == PlatformID.Win32NT)
            {
                SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1);
            }
        }
        #endregion

        #region 创建核心线程池
        /// <summary>
        /// 创建核心线程池
        /// </summary>
        private void CreateCoreThreads(int? coreThreadCount = null)
        {
            if (coreThreadCount != null) _coreThreadCount = coreThreadCount.Value;

            for (int i = 0; i < _coreThreadCount; i++)
            {
                Interlocked.Increment(ref _activeThreadCount);
                Thread thread = new Thread(new ThreadStart(() =>
                {
                    Task task;
                    while (_run)
                    {
                        if (_tasks.TryDequeue(out task))
                        {
                            TryExecuteTask(task);
                        }
                        else
                        {
                            Thread.Sleep(10);
                        }
                    }
                }));
                thread.IsBackground = true;
                thread.Start();
                _coreThreadList.Add(thread);
            }
        }
        #endregion

        #region 创建辅助线程
        /// <summary>
        /// 创建辅助线程
        /// </summary>
        private void CreateThread()
        {
            Interlocked.Increment(ref _activeThreadCount);
            Thread thread = null;
            thread = new Thread(new ThreadStart(() =>
            {
                Task task;
                DateTime dt = DateTime.Now;
                while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _auxiliaryThreadTimeOut)
                {
                    if (_tasks.TryDequeue(out task))
                    {
                        TryExecuteTask(task);
                        dt = DateTime.Now;
                    }
                    else
                    {
                        Thread.Sleep(100);
                    }
                }
                Interlocked.Decrement(ref _activeThreadCount);
                if (_activeThreadCount == _coreThreadCount)
                {
                    GC.Collect();
                    GC.WaitForPendingFinalizers();
                    if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                    {
                        SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1);
                    }
                }
                if (thread != null)
                {
                    thread.Abort();
                    thread = null;
                }
            }));
            thread.IsBackground = true;
            thread.Start();
        }
        #endregion

        #region 创建定时器
        private void CreateTimer()
        {
            if (_timer == null) //_timer不为空时,跳过,不走lock,提升性能
            {
                lock (_lockCreateTimer)
                {
                    if (_timer == null)
                    {
                        if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount)
                        {
                            _timer = new System.Timers.Timer();
                            _timer.Interval = _coreThreadCount == 0 ? 1 : 500;
                            _timer.Elapsed += (s, e) =>
                            {
                                if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount)
                                {
                                    if (_tasks.Count > 0)
                                    {
                                        if (_timer.Interval != 20) _timer.Interval = 20;
                                        CreateThread();
                                    }
                                    else
                                    {
                                        if (_timer.Interval != 500) _timer.Interval = 500;
                                    }
                                }
                                else
                                {
                                    if (_timer != null)
                                    {
                                        _timer.Stop();
                                        _timer.Dispose();
                                        _timer = null;
                                    }
                                }
                            };
                            _timer.Start();
                        }
                    }
                }
            }
        }
        #endregion

        #region 全部取消
        /// <summary>
        /// 全部取消
        /// 当前正在执行的任务无法取消,取消的只是后续任务,相当于AbortAll
        /// </summary>
        public void CancelAll()
        {
            Task tempTask;
            while (_tasks.TryDequeue(out tempTask)) { }
        }
        #endregion

    }
}

View Code

    RunHelper类代码:


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// 线程工具类
    /// </summary>
    public static class RunHelper
    {
        #region 变量属性事件

        #endregion

        #region 线程中执行
        /// <summary>
        /// 线程中执行
        /// </summary>
        public static Task Run(this TaskScheduler scheduler, Action<object> doWork, object arg = null, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew((obj) =>
            {
                try
                {
                    doWork(obj);
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                }
            }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行
        /// <summary>
        /// 线程中执行
        /// </summary>
        public static Task Run(this TaskScheduler scheduler, Action doWork, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew(() =>
            {
                try
                {
                    doWork();
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                }
            }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行
        /// <summary>
        /// 线程中执行
        /// </summary>
        public static Task<T> Run<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew<T>((obj) =>
            {
                try
                {
                    return doWork(obj);
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行
        /// <summary>
        /// 线程中执行
        /// </summary>
        public static Task<T> Run<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
        {
            return Task.Factory.StartNew<T>(() =>
            {
                try
                {
                    return doWork();
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行
        /// <summary>
        /// 线程中执行
        /// </summary>
        public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
        {
            return await Task.Factory.StartNew<T>((obj) =>
            {
                try
                {
                    return doWork(obj);
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

        #region 线程中执行
        /// <summary>
        /// 线程中执行
        /// </summary>
        public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
        {
            return await Task.Factory.StartNew<T>(() =>
            {
                try
                {
                    return doWork();
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.Error(ex, "ThreadUtil.Run错误");
                    return default(T);
                }
            }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        }
        #endregion

    }
}

View Code

    TaskHelper扩展类(代码中LimitedTaskScheduler改为TaskSchedulerEx即可)(这个任务分类有点多,每个任务分类的核心线程一般是不释放的,一直占着线程,算不算滥用):


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// Task帮助类基类
    /// </summary>
    public class TaskHelper
    {
        #region UI任务
        private static LimitedTaskScheduler _UITask;
        /// <summary>
        /// UI任务(4个线程)
        /// </summary>
        public static LimitedTaskScheduler UITask
        {
            get
            {
                if (_UITask == null) _UITask = new LimitedTaskScheduler(4);
                return _UITask;
            }
        }
        #endregion

        #region 菜单任务
        private static LimitedTaskScheduler _MenuTask;
        /// <summary>
        /// 菜单任务
        /// </summary>
        public static LimitedTaskScheduler MenuTask
        {
            get
            {
                if (_MenuTask == null) _MenuTask = new LimitedTaskScheduler(2);
                return _MenuTask;
            }
        }
        #endregion

        #region 计算任务
        private static LimitedTaskScheduler _CalcTask;
        /// <summary>
        /// 计算任务(8个线程)
        /// </summary>
        public static LimitedTaskScheduler CalcTask
        {
            get
            {
                if (_CalcTask == null) _CalcTask = new LimitedTaskScheduler(8);
                return _CalcTask;
            }
        }
        #endregion

        #region 网络请求
        private static LimitedTaskScheduler _RequestTask;
        /// <summary>
        /// 网络请求(32个线程)
        /// </summary>
        public static LimitedTaskScheduler RequestTask
        {
            get
            {
                if (_RequestTask == null) _RequestTask = new LimitedTaskScheduler(32);
                return _RequestTask;
            }
        }
        #endregion

        #region 数据库任务
        private static LimitedTaskScheduler _DBTask;
        /// <summary>
        /// 数据库任务(32个线程)
        /// </summary>
        public static LimitedTaskScheduler DBTask
        {
            get
            {
                if (_DBTask == null) _DBTask = new LimitedTaskScheduler(32);
                return _DBTask;
            }
        }
        #endregion

        #region IO任务
        private static LimitedTaskScheduler _IOTask;
        /// <summary>
        /// IO任务(8个线程)
        /// </summary>
        public static LimitedTaskScheduler IOTask
        {
            get
            {
                if (_IOTask == null) _IOTask = new LimitedTaskScheduler(8);
                return _IOTask;
            }
        }
        #endregion

        #region 首页任务
        private static LimitedTaskScheduler _MainPageTask;
        /// <summary>
        /// 首页任务(16个线程)
        /// </summary>
        public static LimitedTaskScheduler MainPageTask
        {
            get
            {
                if (_MainPageTask == null) _MainPageTask = new LimitedTaskScheduler(16);
                return _MainPageTask;
            }
        }
        #endregion

        #region 图片加载任务
        private static LimitedTaskScheduler _LoadImageTask;
        /// <summary>
        /// 图片加载任务(32个线程)
        /// </summary>
        public static LimitedTaskScheduler LoadImageTask
        {
            get
            {
                if (_LoadImageTask == null) _LoadImageTask = new LimitedTaskScheduler(32);
                return _LoadImageTask;
            }
        }
        #endregion

        #region 浏览器任务
        private static LimitedTaskScheduler _BrowserTask;
        /// <summary>
        /// 浏览器任务
        /// </summary>
        public static LimitedTaskScheduler BrowserTask
        {
            get
            {
                if (_BrowserTask == null) _BrowserTask = new LimitedTaskScheduler(2);
                return _BrowserTask;
            }
        }
        #endregion

    }
}

View Code

     Form1.cs测试代码:


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Management;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Utils;

namespace test
{
    public partial class Form1 : Form
    {
        private TaskSchedulerEx _taskSchedulerEx = null;
        private TaskSchedulerEx _taskSchedulerExSmall = null;
        private TaskSchedulerEx _task = null;

        public Form1()
        {
            InitializeComponent();
            _taskSchedulerEx = new TaskSchedulerEx(50, 500);
            _taskSchedulerExSmall = new TaskSchedulerEx(5, 50);
            _task = new TaskSchedulerEx(2, 10);
        }

        private void Form1_Load(object sender, EventArgs e)
        {

        }

        /// <summary>
        /// 模拟大量网络请求任务
        /// </summary>
        private void button1_Click(object sender, EventArgs e)
        {
            DoTask(_taskSchedulerEx, 200000, 1000, 20);
        }

        /// <summary>
        /// 模拟CPU密集型任务
        /// </summary>
        private void button2_Click(object sender, EventArgs e)
        {
            DoTask(_taskSchedulerEx, 100000, 2000, 1);
        }

        /// <summary>
        /// 模拟大量网络请求任务
        /// </summary>
        private void button3_Click(object sender, EventArgs e)
        {
            DoTask(_taskSchedulerExSmall, 2000, 100, 20);
        }

        /// <summary>
        /// 模拟CPU密集型任务
        /// </summary>
        private void button4_Click(object sender, EventArgs e)
        {
            DoTask(_taskSchedulerExSmall, 2000, 100, 1);
        }

        /// <summary>
        /// 模拟任务
        /// </summary>
        /// <param name="scheduler">scheduler</param>
        /// <param name="taskCount">任务数量</param>
        /// <param name="logCount">每隔多少条数据打一个日志</param>
        /// <param name="delay">模拟延迟或耗时(毫秒)</param>
        private void DoTask(TaskSchedulerEx scheduler, int taskCount, int logCount, int delay)
        {
            _task.Run(() =>
            {
                Log("开始");
                DateTime dt = DateTime.Now;
                List<Task> taskList = new List<Task>();
                for (int i = 1; i <= taskCount; i++)
                {
                    Task task = scheduler.Run((obj) =>
                    {
                        var k = (int)obj;
                        Thread.Sleep(delay); //模拟延迟或耗时
                        if (k % logCount == 0)
                        {
                            Log("最大线程数:" + scheduler.MaxThreadCount + " 核心线程数:" + scheduler.CoreThreadCount + " 活跃线程数:" + scheduler.ActiveThreadCount.ToString().PadLeft(4, ' ') + " 处理数/总数:" + k + " / " + taskCount);
                        }
                    }, i, (ex) =>
                    {
                        Log(ex.Message);
                    });
                    taskList.Add(task);
                }
                Task.WaitAll(taskList.ToArray());
                double d = DateTime.Now.Subtract(dt).TotalSeconds;
                Log("完成,耗时:" + d + "");
            });
        }

        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            if (_taskSchedulerEx != null)
            {
                _taskSchedulerEx.Dispose(); //释放资源
                _taskSchedulerEx = null;
            }
        }
    }
}

View Code

     测试截图:

 

请关注公众号获取更多资料

发表评论