C#学习日记 并发集合

    xiaoxiao2021-03-25  92

    书上例子。

    using System; using System.Collections.Concurrent; using System.IO; using System.Linq; using System.Threading.Tasks; namespace Wrox.ProCSharp.Collections { public class MainEntryPont { static int Main(string[] args) { StartPipeline(); Console.ReadKey(); return 0; } private static async void StartPipeline() { var filenames = new BlockingCollection<string>(); var lines = new BlockingCollection<string>(); var words = new ConcurrentDictionary<string, int>(); var items = new BlockingCollection<Info>(); var coloredItems = new BlockingCollection<Info>(); Task t1 = PipelineStages.ReadFilenamesAsync(@"E:\netstudy\test",filenames); ConsoleHelper.WriteLine("started stage 1"); Task t2 = PipelineStages.LoadContentAsync(filenames, lines); ConsoleHelper.WriteLine("started stage 2"); Task t3 = PipelineStages.ProcessContentAsync(lines, words); ConsoleHelper.WriteLine("started stage 3"); await Task.WhenAll(t1, t2, t3); ConsoleHelper.WriteLine("stages 1 2 3 completed"); Task t4 = PipelineStages.TransferContentAsync(words, items); ConsoleHelper.WriteLine("started stage 4"); Task t5 = PipelineStages.AddColorAsync(items, coloredItems); ConsoleHelper.WriteLine("started stage 5"); Task t6 = PipelineStages.ShowContentAsync(coloredItems); ConsoleHelper.WriteLine("started stage 6"); await Task.WhenAll(t4, t5, t6); ConsoleHelper.WriteLine("all stages finish"); } } public class Info { public string Word{ get; set;} public int Count { get; set;} public string Color { get; set;} } public static class PipelineStages { public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output) { return Task.Run(() => { foreach(string filename in Directory.EnumerateFiles(path, "*.cs",SearchOption.AllDirectories)) { output.Add(filename); ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename)); } output.CompleteAdding(); }); } public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output) { foreach(var filename in input.GetConsumingEnumerable()) { using(FileStream stream = File.OpenRead(filename)) { var reader = new StreamReader(stream); string line = null; while((line = await reader.ReadLineAsync()) != null) { output.Add(line); ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line)); } } } output.CompleteAdding(); } public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output) { return Task.Run(() => { foreach(var line in input.GetConsumingEnumerable()) { string[] words = line.Split(' ',';','\t','{','}','(',')',':',',','"','=','-','<','>','?','*'); foreach(var word in words.Where(w => !string.IsNullOrEmpty(w))) { output.AddOrIncrementValue(word); ConsoleHelper.WriteLine(string.Format("stage 3: added{0}",word)); } } }); } public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output) { return Task.Run(() => { foreach(var word in input.Keys) { int value; if(input.TryGetValue(word, out value)) { var info = new Info{ Word = word, Count = value}; output.Add(info); ConsoleHelper.WriteLine(string.Format("stage 4:added {0}",info.Word)); } } output.CompleteAdding(); }); } public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output) { return Task.Run(() => { foreach(var item in input.GetConsumingEnumerable()) { if(item.Count > 10) { item.Color = "Red"; } else if(item.Count > 5) { item.Color = "Yellow"; } else { item.Color = "Green"; } output.Add(item); ConsoleHelper.WriteLine(string.Format("stage 5:added color {1} to {0}", item, item.Color)); } output.CompleteAdding(); }); } public static Task ShowContentAsync(BlockingCollection<Info> input) { return Task.Run(() => { foreach(var item in input.GetConsumingEnumerable()) { ConsoleHelper.WriteLine(string.Format("stage 6:{0}",item),item.Color); } } ); } } public class ConsoleHelper { private static object syncOutput = new Object(); public static void WriteLine(string message) { lock (syncOutput) { Console.WriteLine(message); } } public static void WriteLine(string message,string color) { lock (syncOutput) { Console.ForegroundColor = (ConsoleColor)Enum.Parse(typeof(ConsoleColor), color); Console.WriteLine(message); Console.ResetColor(); } } } public static class ConcurrentDictionaryExtension { public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict,string key) { bool success = false; while(!success) { int value; if(dict.TryGetValue(key, out value)) { if(dict.TryUpdate(key, value + 1,value)) { success = true; } } else { if(dict.TryAdd(key, 1)) { success = true; } } } } } }

    转载请注明原文地址: https://ju.6miu.com/read-17992.html

    最新回复(0)