例:假设我们有个方法要执行100次,得到100次返回的结果总和。
串行执行
static void Main(
string[] args)
{
SerialExcute();
Console.ReadLine();
}
private static void SerialExcute()
{
int sum=
0;
Stopwatch watch =
new Stopwatch();
watch.Start();
for (
int i =
0; i <
100; i++)
{
sum += Method(i);
}
watch.Stop();
Console.WriteLine(
"100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds,sum);
Console.WriteLine(
"这是主线程.");
Console.WriteLine(
"100次执行结束.");
}
private static int Method(
int i)
{
Console.WriteLine(
"这是第{0}次执行Method", i);
Thread.Sleep(
100);
return i;
}
100次执行总共开销10001毫秒,结果总和为:4950。
线程池并发执行
class Program
{
static void Main(
string[] args)
{
TaskExcute();
}
private static object lockobj =
new object();
private static void TaskExcute()
{
int sum=
0;
Stopwatch watch =
new Stopwatch();
watch.Start();
try
{
CountdownEvent handler =
new CountdownEvent(
100);
ThreadPool.SetMinThreads(
5,
5);
for (
int i =
0; i <
100; i++)
{
var j = i;
ThreadPool.QueueUserWorkItem((state) =>
{
int result = Method(j);
lock (lockobj)
{
sum += result;
}
handler.Signal();
});
}
handler.Wait();
}
catch (AggregateException ex)
{
Console.WriteLine(ex.Message);
foreach (Exception inner
in ex.InnerExceptions)
{
Console.WriteLine(
"Exception type {0} from {1}",
inner.GetType(), inner.Source);
}
}
watch.Stop();
Console.WriteLine(
"100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds, sum);
Console.WriteLine(
"这是主线程.");
Console.WriteLine(
"100次执行结束.");
}
private static int Method(
int i)
{
Console.WriteLine(
"这是第{0}次执行Method", i);
Thread.Sleep(
100);
return i;
}
}
100次执行总共开销1826毫秒,结果总和为:4950。
Task并行执行
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication39
{
class Program
{
static void Main(
string[] args)
{
TaskExcute();
}
private static object lockobj =
new object();
public static CancellationTokenSource TokenSource =
new CancellationTokenSource();
private static void TaskExcute()
{
int sum =
0;
Stopwatch watch =
new Stopwatch();
watch.Start();
try
{
List<Task> tasks =
new List<Task>();
List<
int> results =
new List<
int>();
for (
int i =
0; i <
100; i++)
{
var j = i;
tasks.Add(Task.Factory.StartNew(() =>
{
results.Add(Method(j));
}, TokenSource.Token));
}
Task.WaitAll(tasks.ToArray());
sum = results.Sum();
}
catch (AggregateException ex)
{
foreach (
var inner
in ex.InnerExceptions)
{
Console.WriteLine(inner.Message);
}
}
watch.Stop();
Console.WriteLine(
"100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds, sum);
Console.WriteLine(
"这是主线程.");
Console.WriteLine(
"100次执行结束.");
}
private static int Method(
int i)
{
Console.WriteLine(
"这是第{0}次执行Method", i);
Thread.Sleep(
100);
try
{
}
catch (Exception ex)
{
TokenSource.Cancel();
throw ex;
}
return i;
}
}
}
100次执行总共开销2218毫秒,结果总和为:4950。
重写任务调度器
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication39
{
class Program
{
static void Main(
string[] args)
{
TaskExcute();
}
private static object lockobj =
new object();
private static void TaskExcute()
{
int sum =
0;
Stopwatch watch =
new Stopwatch();
watch.Start();
try
{
NTaskScheduler scheduler =
new NTaskScheduler(
5);
List<Task> tasks =
new List<Task>();
for (
int i =
0; i <
100; i++)
{
var j = i;
tasks.Add(
new Task(() =>
{
var result = Method(j, scheduler._tokenSource);
lock (lockobj)
{
sum += result;
}
}, scheduler._tokenSource.Token));
tasks[i].Start(scheduler);
}
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ex)
{
foreach (
var item
in ex.InnerExceptions)
{
Console.WriteLine(item.Message);
}
}
watch.Stop();
Console.WriteLine(
"100次执行总共开销{0}毫秒,结果总和为:{1}。", watch.ElapsedMilliseconds, sum);
Console.WriteLine(
"这是主线程.");
Console.WriteLine(
"100次执行结束.");
}
private static int Method(
int i, CancellationTokenSource TokenSource)
{
Console.WriteLine(
"这是第{0}次执行Method", i);
Thread.Sleep(
100);
try
{
}
catch (Exception ex)
{
TokenSource.Cancel();
throw ex;
}
return i;
}
}
}
NTaskScheduler.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication39
{
public class NTaskScheduler : TaskScheduler, IDisposable
{
private CancellationTokenSource TokenSource =
null;
public CancellationTokenSource _tokenSource {
get {
if (TokenSource ==
null) TokenSource =
new CancellationTokenSource();
return TokenSource;
}
}
private List<Thread> _threads =
new List<Thread>();
private BlockingCollection<Task> _tasks =
new BlockingCollection<Task>();
private int _concurrencylevel;
public NTaskScheduler(
int concurrencylevel)
{
this._concurrencylevel = concurrencylevel;
for (
int i =
0; i < concurrencylevel; i++)
{
_threads.Add(
new Thread(() =>
{
foreach (Task task
in _tasks.GetConsumingEnumerable())
this.TryExecuteTask(task);
}));
_threads[i].Start();
}
}
public void Dispose()
{
this._tasks.CompleteAdding();
foreach (Thread t
in _threads)
{
t.Join();
}
}
protected override IEnumerable<Task>
GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override bool TryExecuteTaskInline(Task task,
bool taskWasPreviouslyQueued)
{
if (_threads.Contains(Thread.CurrentThread))
return TryExecuteTask(task);
return false;
}
}
}
100次执行总共开销2021毫秒,结果总和为:4950。