对Parallel.Invoke进行控制 Parallel.Invoke提供了一个重载版本,它可以接受一个ParallelOptions对象作为参数,对Parallel.Invoke的执行进行控制。通过这个对象,我们可以控制并行的最大线程数,各个任务是否取消执行等等。例如,在一个智能化的家中,系统会判断主人是否离开房间,如果主人离开了房间,则自动关闭屋子里的各种电器。利用Parallel.Invoke我们可以实现如下:
public static void PInvokeCancel() { // 创建取消对象 CancellationTokenSource cts = new CancellationTokenSource(); // 利用取消对象,创建ParallelOptions ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token }; // 设置最大线程数 pOption.MaxDegreeOfParallelism = 2 ; // 创建一个守护监视进程 Task.Factory.StartNew(() => { Console.WriteLine( " Cancellation in 5 sec. " ); Thread.Sleep( 5000 ); // 取消,结束任务的执行 cts.Cancel(); Console.WriteLine( " Canceled requested " ); }); try { // 以ParallelOptions作为参数, // 调用Parallel.Invoke Parallel.Invoke(pOption, () => ShutdownLights(pOption.CancellationToken), () => ShutdownComputer(pOption.CancellationToken)); // 输出执行结果 Console.WriteLine( " Lights and computer are tuned off. " ); } catch (Exception e) { Console.WriteLine(e.Message); } } private static void ShutdownLights(CancellationToken token) { while ( ! token.IsCancellationRequested) { Console.WriteLine( " Light is on. " ); Thread.Sleep( 1000 ); } } private static void ShutdownComputer(CancellationToken token) { while ( ! token.IsCancellationRequested) { Console.WriteLine( " Computer is on. " ); Thread.Sleep( 1000 ); } }
除了这种方式之外,ParallelOptions更多地应用在取消任务队列中还未来得及执行的任务。当我们限制了最大并发线程数的时候,如果需要通过Parallel.Invoke执行的任务较多,则有可能部分任务在队列中排队而得不到及时的执行,如果到了一定的条件这些任务还没有执行,我们可能取消这些任务。一个恰当的现实生活中的例子就是火车站买票。火车站买票的人很多,但是售票的窗口有限,当到了下班时间后,窗口就不再售票了,也就是剩下的售票任务需要取消掉。我们可以用下面的代码来模拟这样一个场景:
public static void PInvokeCancel() { // 创建取消对象 CancellationTokenSource cts = new CancellationTokenSource(); // 利用取消对象,创建ParallelOptions ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token }; // 设置最大线程数,也就相当于20个售票窗口 pOption.MaxDegreeOfParallelism = 20 ; // 创建一个守护监视进程 // 当到下班时间后就取消剩下的售票活动 Task.Factory.StartNew(() => { Console.WriteLine( " Cancellation in 5 sec. " ); Thread.Sleep( 5000 ); // 取消,结束任务的执行 cts.Cancel(); Console.WriteLine( " Canceled requested " ); }); try { // 创建售票活动 Action[] CustomerServices = CreateCustomerService( 1000 ); // 以ParallelOptions作为参数, // 调用Parallel.Invoke Parallel.Invoke(pOption, CustomerServices); } catch (Exception e) { // 当任务取消后,抛出一个异常 Console.WriteLine(e.Message); } } // 创建售票的活动 static Action[] CreateCustomerService( int n) { Action[] result = new Action[n]; for ( int i = 0 ; i < n; i ++ ) { result[i] = () => { Console.WriteLine( " Customer Service {0} " , Task.CurrentId); // 模拟售票需要的时间 Thread.Sleep( 2000 ); }; } return result; }
并行任务之间的同步
有时候我们在处理并行任务的时候,各个任务之间需要同步,也就是同时执行的并行任务,需要在共同到达某一个状态的后再一共继续执行。我们可以举一个现实生活中的例子。陈良乔,贾玮和单春晖是好朋友,他们相约到电影院看《建国大业》。他们三个住在不同的地方,为了能一起买票进电影院,他们约好先在电影院门口的KFC会合,然后再一起进电影院。这其中就涉及到一个同步的问题:他们需要先在KFC会合。他们是从家里分别到KFC的,但是需要在KFC进行同步,等到三个人都到齐后在完成后后继的动作,进电影院看电影。
为了完成并行任务之间的同步,.NET
Framework中提供了一个类Barrier。顾名思义,Barrier就像一个关卡或者是剪票口一样,通过Barrier类,我们可以管理并行任务的执行,完成他们之间的同步。Barrier类的使用非常简单,我们只需要在主线程中声明一个Barrier对象,同时指明需要同步的任务数。然后,在需要进行同步的地方调用Barrier类的SignalAndWait函数就可以了。
当一个并行任务到达SignalAndWait后,它会暂停执行,等待所有并行任务都到达同步点之后再继续往下执行。下面我们以一个实际的例子,来看看如何利用Barrier类完成看电影的同步问题。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ParallelBarrier { class Program { // 用于同步的Barrier对象 static Barrier sync; static void Main(string[] args) { // 创建Barrier对象,这里我们需要同步 // 任务有三个 sync = new Barrier( 3 ); // 开始执行并行任务 var steps = new Action[] { () => gotothecinema( " 陈良乔 " , TimeSpan.FromSeconds( 5 ) ), () => gotothecinema( " 贾玮 " , TimeSpan.FromSeconds( 2 ) ), () => gotothecinema( " 单春晖 " , TimeSpan.FromSeconds( 4 ) )}; Parallel.Invoke(steps); Console.ReadKey(); } // 任务 static void gotothecinema(string strName, TimeSpan timeToKFC ) { Console.WriteLine( " [{0}] 从家里出发。 " , strName); // 从家里到KFC Thread.Sleep(timeToKFC); Console.WriteLine( " [{0}] 到达KFC。 " , strName); // 等待其他人到达 sync.SignalAndWait(); // 同步后,进行后继动作 Console.WriteLine( " [{0}] 买票进电影院。 " , strName); } } } 【IT168 专稿】书接上回。在前一篇“”文章中,我们介绍了如何利用Parallel.For和Parallel.ForEach函数来并行化for循环和foreach循环。实际上,Parallel.For和Parallel.ForEach函数主要是针对“并行数据”的并行化操作,所谓并行数据,就是整个数据集中数据单元是相互独立的,可以同时进行处理。在实际开发中,我们遇到的可以并行处理的不仅包括“并行数据”,还包括可以同时进行的“并行逻辑”。所谓“并行逻辑”,就是相互独立,可以同时执行的多个任务。比如,程序员陈良乔每天早上要做两件事情:烧水洗脸和锻炼身体。这两件事情就是相互独立可以并行的,也就是说他在烧水的时候可以同时锻炼身体。在以前的单核时代,在同一时间只能完成一件事情,那么陈良乔只能先烧水后锻炼,或者是先锻炼后烧水,这导致他上班总是迟到。进入多核时代,CPU可以在同一时间完成多件事情了,借助.Net Framework 4.0中的Parallel类,我们可以方便地处理“并行逻辑”。现在,程序员陈良乔可以一边锻炼一边烧水,再也没有迟到过了。他逢人便说:“Parallel真是个好东西!自从用了它,我腰也不酸了,背也不疼了,编程更有劲儿了。。。” 使用Parallel.Invoke处理并行逻辑
跟Parallel.For函数相似,Parallel.Invoke也是Parallel类的一个静态函数,它可以接受一个Action[]类型的对象作为参数,这个对象,就是我们要执行的任务。系统会根据代码运行的硬件环境,主要是CPU运算核心的个数,自动地进行线程的创建和分配。这有些类似于我们所熟悉的多线程开发,通过为每个线程指定一个线程函数而让多个任务同时进行,只是Parallel.Invoke函数简化了线程的创建和分配等繁琐的动作,我们只需要提供核心的线程函数就可以了。下面我们来看一个实际的例子。在上文中,我们介绍了程序员陈良乔起床的例子,在以前的单核时代,他起床大约是这个样子的:
// 串行式起床 private static void GetUp() { Start( " GetUp " ); // 先烧水 boil(); // 后锻炼 exercise(); End( " GetUp " ); } // 锻炼 private static void exercise() { Console.WriteLine( " Exercise " ); Thread.Sleep( 2000 ); Console.WriteLine( " Finish Exercise " ); } // 烧水 private static void boil() { Console.WriteLine( " Boil " ); Thread.Sleep( 3000 ); Console.WriteLine( " Finish Boil " ); }在单核时代,CPU在同一时间只能做一件事情,所以他只能先烧水,后锻炼,这样显然会耽误时间。一天,他又因为这事而迟到了,老板骂道,“你是猪啊,你不会用Parallel.Invoke一边烧水一边锻炼啊?”于是,有了下面的并行式起床: // 并行式起床 private static void ParallelGetUp() { Start( " ParallelGetUp " ); // 在烧水的同时,锻炼身体 var steps = new Action[] { () => boil(), () => exercise() }; Parallel.Invoke(steps); End( " ParallelGetUp " ); }
通过Parallel.Invoke函数,我们将一些相互独立的任务同时执行,实现了“并行逻辑”,也大大地提高了应用程序的性能和效率。从下面的截图中,我们可以明显地看出两种方式的差别。串行方式所耗费的时间,是两个步骤的时间总和,而并行方式所耗费的时间,大约是单个任务的耗时最长的哪一个。
图1 串行和并行的执行情况
对Parallel.Invoke进行控制
Parallel.Invoke提供了一个重载版本,它可以接受一个ParallelOptions对象作为参数,对Parallel.Invoke的执行进行控制。通过这个对象,我们可以控制并行的最大线程数,各个任务是否取消执行等等。例如,在一个智能化的家中,系统会判断主人是否离开房间,如果主人离开了房间,则自动关闭屋子里的各种电器。利用Parallel.Invoke我们可以实现如下:
public static void PInvokeCancel(){ // 创建取消对象 CancellationTokenSource cts = new CancellationTokenSource(); // 利用取消对象,创建ParallelOptions ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token }; // 设置最大线程数 pOption.MaxDegreeOfParallelism = 2 ; // 创建一个守护监视进程 Task.Factory.StartNew(() => { Console.WriteLine( " Cancellation in 5 sec. " );Thread.Sleep( 5000 ); // 取消,结束任务的执行 cts.Cancel();Console.WriteLine( " Canceled requested " );}); try { // 以ParallelOptions作为参数, // 调用Parallel.Invoke Parallel.Invoke(pOption, () => ShutdownLights(pOption.CancellationToken),() => ShutdownComputer(pOption.CancellationToken)); // 输出执行结果 Console.WriteLine( " Lights and computer are tuned off. " );} catch (Exception e){ Console.WriteLine(e.Message);}} private static void ShutdownLights(CancellationToken token){ while ( ! token.IsCancellationRequested){ Console.WriteLine( " Light is on. " );Thread.Sleep( 1000 );}} private static void ShutdownComputer(CancellationToken token){ while ( ! token.IsCancellationRequested){ Console.WriteLine( " Computer is on. " );Thread.Sleep( 1000 );}}
除了这种方式之外,ParallelOptions更多地应用在取消任务队列中还未来得及执行的任务。当我们限制了最大并发线程数的时候,如果需要通过Parallel.Invoke执行的任务较多,则有可能部分任务在队列中排队而得不到及时的执行,如果到了一定的条件这些任务还没有执行,我们可能取消这些任务。一个恰当的现实生活中的例子就是火车站买票。火车站买票的人很多,但是售票的窗口有限,当到了下班时间后,窗口就不再售票了,也就是剩下的售票任务需要取消掉。我们可以用下面的代码来模拟这样一个场景:
public static void PInvokeCancel() { // 创建取消对象 CancellationTokenSource cts = new CancellationTokenSource(); // 利用取消对象,创建ParallelOptions ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token }; // 设置最大线程数,也就相当于20个售票窗口 pOption.MaxDegreeOfParallelism = 20 ; // 创建一个守护监视进程 // 当到下班时间后就取消剩下的售票活动 Task.Factory.StartNew(() => { Console.WriteLine( " Cancellation in 5 sec. " ); Thread.Sleep( 5000 ); // 取消,结束任务的执行 cts.Cancel(); Console.WriteLine( " Canceled requested " ); }); try { // 创建售票活动 Action[] CustomerServices = CreateCustomerService( 1000 ); // 以ParallelOptions作为参数, // 调用Parallel.Invoke Parallel.Invoke(pOption, CustomerServices); } catch (Exception e) { // 当任务取消后,抛出一个异常 Console.WriteLine(e.Message); } } // 创建售票的活动 static Action[] CreateCustomerService( int n) { Action[] result = new Action[n]; for ( int i = 0 ; i < n; i ++ ) { result[i] = () => { Console.WriteLine( " Customer Service {0} " , Task.CurrentId); // 模拟售票需要的时间 Thread.Sleep( 2000 ); }; } return result; }
并行任务之间的同步
有时候我们在处理并行任务的时候,各个任务之间需要同步,也就是同时执行的并行任务,需要在共同到达某一个状态的后再一共继续执行。我们可以举一个现实生活中的例子。陈良乔,贾玮和单春晖是好朋友,他们相约到电影院看《建国大业》。他们三个住在不同的地方,为了能一起买票进电影院,他们约好先在电影院门口的KFC会合,然后再一起进电影院。这其中就涉及到一个同步的问题:他们需要先在KFC会合。他们是从家里分别到KFC的,但是需要在KFC进行同步,等到三个人都到齐后在完成后后继的动作,进电影院看电影。
为了完成并行任务之间的同步,.NET
Framework中提供了一个类Barrier。顾名思义,Barrier就像一个关卡或者是剪票口一样,通过Barrier类,我们可以管理并行任务的执行,完成他们之间的同步。Barrier类的使用非常简单,我们只需要在主线程中声明一个Barrier对象,同时指明需要同步的任务数。然后,在需要进行同步的地方调用Barrier类的SignalAndWait函数就可以了。
当一个并行任务到达SignalAndWait后,它会暂停执行,等待所有并行任务都到达同步点之后再继续往下执行。下面我们以一个实际的例子,来看看如何利用Barrier类完成看电影的同步问题。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ParallelBarrier { class Program { // 用于同步的Barrier对象 static Barrier sync; static void Main(string[] args) { // 创建Barrier对象,这里我们需要同步 // 任务有三个 sync = new Barrier( 3 ); // 开始执行并行任务 var steps = new Action[] { () => gotothecinema( " 陈良乔 " , TimeSpan.FromSeconds( 5 ) ), () => gotothecinema( " 贾玮 " , TimeSpan.FromSeconds( 2 ) ), () => gotothecinema( " 单春晖 " , TimeSpan.FromSeconds( 4 ) )}; Parallel.Invoke(steps); Console.ReadKey(); } // 任务 static void gotothecinema(string strName, TimeSpan timeToKFC ) { Console.WriteLine( " [{0}] 从家里出发。 " , strName); // 从家里到KFC Thread.Sleep(timeToKFC); Console.WriteLine( " [{0}] 到达KFC。 " , strName); // 等待其他人到达 sync.SignalAndWait(); // 同步后,进行后继动作 Console.WriteLine( " [{0}] 买票进电影院。 " , strName); } } }
在这段代码中,我们首先创建了Barrier对象,因为在这里需要同步的任务有三个,所以创建Barrier对象时是的参数是3。然后就是使用Parallel.Invoke执行并行任务。我们在并行任务gotothecinema中设置了一个同步点,在这里我们调用Barrier对象的SignalAndWait函数,它表示当前任务已经到达同步点并同时等待其他任务到达同步点。当所有任务都到达同步点之后,再继续往下执行。运行上面的程序,我们可以获得这样的输出:
图2 使用Barrier进行同步
更复杂的任务之间的同步
我们在使用Barrier进行并行任务之间的同步时,有这样一个缺陷,我们需要预先知道所有需要同步的并行任务的数目,如果这个数目是随机的,就无法使用Barrier进行任务之间的同步了。并行任务数目不定这种情况很常见。我们还是来看上文中看电影的例子,每场进电影院看电影的观众数目是不固定的,那么退场的观众也是不固定的,甚至还有中途退场的。当所有观众都退场后,我们需要打扫电影院的卫生。这里需要的同步的就是所有观众都退场。针对这种数目不定的多个并行任务,.NET
Framework提供了CountdownEvent这个类来进行任务之间的同步。
就像它的名字一样,CountdownEvent基于这样一个简单的规则:当有新的需要同步的任务产生时,就调用AddCount增加它的计数,当有任务到达同步点是,就调用Signal函数减小它的计数,当CountdownEvent的计数为零时,就表示所有需要同步的任务已经完成,可以开始下一步任务了。下面我们利用CountdownEvent来模拟一下观众进场立场的情景。
1 using System; 2 3 using System.Collections.Generic; 4 5 using System.Linq; 6 7 using System.Text; 8 9 using System.Threading; 10 11 using System.Threading.Tasks; 12 13 namespace CountdownEventDemo 14 15 { 16 17 // 观众类,用来表示一位观众 18 19 class Customer 20 21 { 22 23 public Customer( int nID) 24 25 { 26 27 m_nID = nID; 28 29 } 30 31 // 观众的ID 32 33 public int m_nID; 34 35 } 36 37 class Program 38 39 { 40 41 static void Main(string[] args) 42 43 { 44 45 // 创建CountdownEvent同步对象 46 47 using (var countdown = new CountdownEvent( 1 )) 48 49 { 50 51 // 产生一个随机数,表示观众的数目 52 53 Random countRandom = new Random(DateTime.Now.Millisecond); 54 55 int nCount = countRandom.Next( 10 ); 56 57 // 构造每一位观众看电影的任务 58 59 Action[] seeafilm = new Action[ nCount ]; 60 61 for ( int i = 0 ; i < nCount; i ++ ) 62 63 { 64 65 // 构造Customer对象,表示观众 66 67 Customer currentCustomer = new Customer( i + 1 ); 68 69 seeafilm[i] = () => 70 71 { 72 73 // 观众进场 74 75 countdown.AddCount(); 76 77 Console.WriteLine( " 观众 {0} 进场。 " , currentCustomer.m_nID); 78 79 // 模拟看电影的时间 80 81 Thread.Sleep(countRandom.Next( 3000 , 6000 )); 82 83 // 观众退场 84 85 countdown.Signal(); 86 87 Console.WriteLine( " 观众 {0} 退场。 " , currentCustomer.m_nID); 88 89 }; 90 91 } 92 93 // 并行执行任务 94 95 Parallel.Invoke( seeafilm ); 96 97 // 在此同步,最后CountdownEvent的计数变为零 98 99 countdown.Signal(); 100 101 countdown.Wait(); 102 103 } 104 105 Console.WriteLine( " 所有观众退场,开始打扫卫生。 " ); 106 107 Console.ReadKey(); 108 109 } 110 111 在这段代码中,我们使用CountdownEvent进行随机个数任务之间的同步。最后,我们可以得到这样的输出。
图3 使用CountdownEvent进行同步
通过Parallel.Invoke函数,我们可以轻松地将相互独立的任务并行执行,同时通过Barrier和CountdownEvent类进行任务之间的同步。这种并行计算的开发方式,比以前那种基于线程的并行计算开发方式简便很多,解放了程序员的脑袋,让他们可以把更多的脑力放到业务逻辑问题的解决之上。
使用Parallel类,多快好省地开发并行计算应用程序。