多线程编程(3、多任务并发处理)

    xiaoxiao2021-03-25  140

    例:假设我们有个方法要执行100次,得到100次返回的结果总和。

    串行执行

    static void Main(string[] args) { SerialExcute(); Console.ReadLine(); } /// <summary> /// 串行执行,要阻塞主线程,不推荐 /// </summary> private static void SerialExcute() { int sum=0; Stopwatch watch = new Stopwatch(); watch.Start(); for (int i = 0; i < 100; i++) { sum += Method(i); } watch.Stop(); Console.WriteLine("100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds,sum); Console.WriteLine("这是主线程."); Console.WriteLine("100次执行结束."); } private static int Method(int i) { Console.WriteLine("这是第{0}次执行Method", i); //睡眠100毫秒秒 Thread.Sleep(100); return i; }

    100次执行总共开销10001毫秒,结果总和为:4950。

    线程池并发执行

    class Program { static void Main(string[] args) { TaskExcute(); } private static object lockobj = new object(); /// <summary> /// 线程池并行运行,如果出异常捕获不了,将导致整个程序崩溃。(不推荐使用) /// </summary> private static void TaskExcute() { int sum=0; Stopwatch watch = new Stopwatch(); watch.Start(); try { //采用计数器来判断线程池里的线程全部执行完毕 CountdownEvent handler = new CountdownEvent(100); //ThreadPool.SetMaxThreads(5, 5); //设置并发数 5 ThreadPool.SetMinThreads(5, 5); for (int i = 0; i < 100; i++) { var j = i;//这里一定要定义一个新的变量 ThreadPool.QueueUserWorkItem((state) => { int result = Method(j); //加锁 lock (lockobj) { sum += result; } handler.Signal(); }); } ///主线程等待 handler.Wait(); } catch (AggregateException ex) { Console.WriteLine(ex.Message); //.NET4 Task的统一异常处理机制 foreach (Exception inner in ex.InnerExceptions) { Console.WriteLine("Exception type {0} from {1}", inner.GetType(), inner.Source); } } watch.Stop(); Console.WriteLine("100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds, sum); Console.WriteLine("这是主线程."); Console.WriteLine("100次执行结束."); } private static int Method(int i) { Console.WriteLine("这是第{0}次执行Method", i); //睡眠100毫秒 Thread.Sleep(100); // throw new NullReferenceException("测试异常"); return i; } }

    100次执行总共开销1826毫秒,结果总和为:4950。

    Task并行执行

    using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication39 { class Program { static void Main(string[] args) { TaskExcute(); } private static object lockobj = new object(); public static CancellationTokenSource TokenSource = new CancellationTokenSource(); /// <summary> /// Task并发执行,不能自定义并发数(不推荐) /// </summary> private static void TaskExcute() { int sum = 0; Stopwatch watch = new Stopwatch(); watch.Start(); try { List<Task> tasks = new List<Task>(); List<int> results = new List<int>(); for (int i = 0; i < 100; i++) { var j = i; tasks.Add(Task.Factory.StartNew(() => { results.Add(Method(j)); }, TokenSource.Token)); } Task.WaitAll(tasks.ToArray()); sum = results.Sum(); } catch (AggregateException ex) { foreach (var inner in ex.InnerExceptions) { Console.WriteLine(inner.Message); } } watch.Stop(); Console.WriteLine("100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds, sum); Console.WriteLine("这是主线程."); Console.WriteLine("100次执行结束."); } private static int Method(int i) { Console.WriteLine("这是第{0}次执行Method", i); //睡眠100毫秒,将此值调大将能开到并发数 Thread.Sleep(100); try { // throw new NullReferenceException("测试异常"); } catch (Exception ex) { TokenSource.Cancel(); throw ex; } return i; } } }

    100次执行总共开销2218毫秒,结果总和为:4950。

    重写任务调度器

    using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication39 { class Program { static void Main(string[] args) { TaskExcute(); } private static object lockobj = new object(); /// <summary> /// 重写任务调度器实现并发执行,可以捕获异常且可以自定义并发数,推荐使用 /// </summary> private static void TaskExcute() { int sum = 0; Stopwatch watch = new Stopwatch(); watch.Start(); try { NTaskScheduler scheduler = new NTaskScheduler(5); List<Task> tasks = new List<Task>(); for (int i = 0; i < 100; i++) { var j = i; tasks.Add(new Task(() => { var result = Method(j, scheduler._tokenSource); lock (lockobj) { sum += result; } }, scheduler._tokenSource.Token)); tasks[i].Start(scheduler); } Task.WaitAll(tasks.ToArray()); } catch (AggregateException ex) { foreach (var item in ex.InnerExceptions) { Console.WriteLine(item.Message); } } watch.Stop(); Console.WriteLine("100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds, sum); Console.WriteLine("这是主线程."); Console.WriteLine("100次执行结束."); } private static int Method(int i, CancellationTokenSource TokenSource) { Console.WriteLine("这是第{0}次执行Method", i); //睡眠100毫秒,当前时间放大可以查看并发数 Thread.Sleep(100); try { // throw new NullReferenceException("测试异常"); } catch (Exception ex) { TokenSource.Cancel(); throw ex; } return i; } } }

    NTaskScheduler.cs

    using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication39 { /// <summary> /// 自定义并发任务调度 /// </summary> public class NTaskScheduler : TaskScheduler, IDisposable { private CancellationTokenSource TokenSource =null; public CancellationTokenSource _tokenSource { get { if (TokenSource == null) TokenSource = new CancellationTokenSource(); return TokenSource; } } private List<Thread> _threads = new List<Thread>(); private BlockingCollection<Task> _tasks = new BlockingCollection<Task>(); private int _concurrencylevel; /// <summary> /// 初始化并发调度器 /// </summary> /// <param name="concurrencyLevel">并发数</param> public NTaskScheduler(int concurrencylevel) { this._concurrencylevel = concurrencylevel; for (int i = 0; i < concurrencylevel; i++) { _threads.Add(new Thread(() => { foreach (Task task in _tasks.GetConsumingEnumerable()) this.TryExecuteTask(task); })); _threads[i].Start(); } } public void Dispose() { this._tasks.CompleteAdding();//不接受Task的添加 foreach (Thread t in _threads) { t.Join(); } } /// <summary> /// 仅对于调试器支持,生成当前排队到计划程序中等待执行的 System.Threading.Tasks.Task 实例的枚举 /// </summary> /// <returns>一个允许调试器遍历当前排队到此计划程序中的任务的枚举</returns> protected override IEnumerable<Task> GetScheduledTasks() { return _tasks.ToArray(); } protected override void QueueTask(Task task) { _tasks.Add(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { if (_threads.Contains(Thread.CurrentThread)) return TryExecuteTask(task); return false; } } }

    100次执行总共开销2021毫秒,结果总和为:4950。

    转载请注明原文地址: https://ju.6miu.com/read-3340.html

    最新回复(0)