博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[转载]Parallel的使用
阅读量:6090 次
发布时间:2019-06-20

本文共 12504 字,大约阅读时间需要 41 分钟。

对Parallel.Invoke进行控制

  Parallel.Invoke提供了一个重载版本,它可以接受一个ParallelOptions对象作为参数,对Parallel.Invoke的执行进行控制。通过这个对象,我们可以控制并行的最大线程数,各个任务是否取消执行等等。例如,在一个智能化的家中,系统会判断主人是否离开房间,如果主人离开了房间,则自动关闭屋子里的各种电器。利用Parallel.Invoke我们可以实现如下:

public
static
void
PInvokeCancel()
{
//
创建取消对象
CancellationTokenSource cts
=
new
CancellationTokenSource();
//
利用取消对象,创建ParallelOptions
ParallelOptions
pOption
=
new
ParallelOptions() { CancellationToken
=
cts.Token
};
//
设置最大线程数
pOption.MaxDegreeOfParallelism
=
2
;
//
创建一个守护监视进程
Task.Factory.StartNew(()
=>
{
Console.WriteLine(
"
Cancellation in 5
sec.
"
);
Thread.Sleep(
5000
);
//
取消,结束任务的执行
cts.Cancel();
Console.WriteLine(
"
Canceled
requested
"
);
});
try
{
//
以ParallelOptions作为参数,
//
调用Parallel.Invoke
Parallel.Invoke(pOption, ()
=>
ShutdownLights(pOption.CancellationToken),
()
=>
ShutdownComputer(pOption.CancellationToken));
//
输出执行结果
Console.WriteLine(
"
Lights and computer
are tuned off.
"
);
}
catch
(Exception
e)
{
Console.WriteLine(e.Message);
}
}
private
static
void
ShutdownLights(CancellationToken token)
{
while
(
!
token.IsCancellationRequested)
{
Console.WriteLine(
"
Light is on.
"
);
Thread.Sleep(
1000
);
}
}
private
static
void
ShutdownComputer(CancellationToken token)
{
while
(
!
token.IsCancellationRequested)
{
Console.WriteLine(
"
Computer is
on.
"
);
Thread.Sleep(
1000
);
}
}

    

除了这种方式之外,ParallelOptions更多地应用在取消任务队列中还未来得及执行的任务。当我们限制了最大并发线程数的时候,如果需要通过Parallel.Invoke执行的任务较多,则有可能部分任务在队列中排队而得不到及时的执行,如果到了一定的条件这些任务还没有执行,我们可能取消这些任务。一个恰当的现实生活中的例子就是火车站买票。火车站买票的人很多,但是售票的窗口有限,当到了下班时间后,窗口就不再售票了,也就是剩下的售票任务需要取消掉。我们可以用下面的代码来模拟这样一个场景:

public
static
void
PInvokeCancel()
{
//
创建取消对象
CancellationTokenSource cts
=
new
CancellationTokenSource();
//
利用取消对象,创建ParallelOptions
ParallelOptions pOption
=
new
ParallelOptions()
{ CancellationToken
=
cts.Token };
//
设置最大线程数,也就相当于20个售票窗口
pOption.MaxDegreeOfParallelism
=
20
;
//
创建一个守护监视进程
//
当到下班时间后就取消剩下的售票活动
Task.Factory.StartNew(()
=>
{
Console.WriteLine(
"
Cancellation in 5
sec.
"
);
Thread.Sleep(
5000
);
//
取消,结束任务的执行
cts.Cancel();
Console.WriteLine(
"
Canceled
requested
"
);
});
try
{
//
创建售票活动
Action[]
CustomerServices
=
CreateCustomerService(
1000
);
//
以ParallelOptions作为参数,
//
调用Parallel.Invoke
Parallel.Invoke(pOption,
CustomerServices);
}
catch
(Exception
e)
{
//
当任务取消后,抛出一个异常
Console.WriteLine(e.Message);
}
}
//
创建售票的活动
static
Action[]
CreateCustomerService(
int
n)
{
Action[] result
=
new
Action[n];
for
(
int
i
=
0
; i
<
n; i
++
)
{
result[i]
=
()
=>
{
Console.WriteLine(
"
Customer Service
{0}
"
,
Task.CurrentId);
//
模拟售票需要的时间
Thread.Sleep(
2000
);
};
}
return
result;
}

    并行任务之间的同步

  有时候我们在处理并行任务的时候,各个任务之间需要同步,也就是同时执行的并行任务,需要在共同到达某一个状态的后再一共继续执行。我们可以举一个现实生活中的例子。陈良乔,贾玮和单春晖是好朋友,他们相约到电影院看《建国大业》。他们三个住在不同的地方,为了能一起买票进电影院,他们约好先在电影院门口的KFC会合,然后再一起进电影院。这其中就涉及到一个同步的问题:他们需要先在KFC会合。他们是从家里分别到KFC的,但是需要在KFC进行同步,等到三个人都到齐后在完成后后继的动作,进电影院看电影。

  为了完成并行任务之间的同步,.NET

Framework中提供了一个类Barrier。顾名思义,Barrier就像一个关卡或者是剪票口一样,通过Barrier类,我们可以管理并行任务的执行,完成他们之间的同步。Barrier类的使用非常简单,我们只需要在主线程中声明一个Barrier对象,同时指明需要同步的任务数。然后,在需要进行同步的地方调用Barrier类的SignalAndWait函数就可以了。
当一个并行任务到达SignalAndWait后,它会暂停执行,等待所有并行任务都到达同步点之后再继续往下执行。下面我们以一个实际的例子,来看看如何利用Barrier类完成看电影的同步问题。

using System;
using
System.Collections.Generic;
using System.Linq;
using
System.Text;
using System.Threading;
using
System.Threading.Tasks;
namespace
ParallelBarrier
{
class
Program
{
//
用于同步的Barrier对象
static
Barrier
sync;
static
void
Main(string[] args)
{
//
创建Barrier对象,这里我们需要同步
//
任务有三个
sync
=
new
Barrier(
3
);
//
开始执行并行任务
var steps
=
new
Action[] { ()
=>
gotothecinema(
"
陈良乔
"
, TimeSpan.FromSeconds(
5
) ),
()
=>
gotothecinema(
"
贾玮
"
, TimeSpan.FromSeconds(
2
) ),
()
=>
gotothecinema(
"
单春晖
"
, TimeSpan.FromSeconds(
4
)
)};
Parallel.Invoke(steps);
Console.ReadKey();
}
//
任务
static
void
gotothecinema(string strName, TimeSpan timeToKFC
)
{
Console.WriteLine(
"
[{0}]
从家里出发。
"
,
strName);
//
从家里到KFC
Thread.Sleep(timeToKFC);
Console.WriteLine(
"
[{0}]
到达KFC。
"
,
strName);
//
等待其他人到达
sync.SignalAndWait();
//
同步后,进行后继动作
Console.WriteLine(
"
[{0}]
买票进电影院。
"
,
strName);
}
}
}
IT168 专稿】书接上回。在前一篇“”文章中,我们介绍了如何利用Parallel.For和Parallel.ForEach函数来并行化for循环和foreach循环。实际上,Parallel.For和Parallel.ForEach函数主要是针对“并行数据”的并行化操作,所谓并行数据,就是整个数据集中数据单元是相互独立的,可以同时进行处理。在实际开发中,我们遇到的可以并行处理的不仅包括“并行数据”,还包括可以同时进行的“并行逻辑”。所谓“并行逻辑”,就是相互独立,可以同时执行的多个任务。比如,程序员陈良乔每天早上要做两件事情:烧水洗脸和锻炼身体。这两件事情就是相互独立可以并行的,也就是说他在烧水的时候可以同时锻炼身体。在以前的单核时代,在同一时间只能完成一件事情,那么陈良乔只能先烧水后锻炼,或者是先锻炼后烧水,这导致他上班总是迟到。进入多核时代,CPU可以在同一时间完成多件事情了,借助.Net
Framework
4.0中的Parallel类,我们可以方便地处理“并行逻辑”。现在,程序员陈良乔可以一边锻炼一边烧水,再也没有迟到过了。他逢人便说:“Parallel真是个好东西!自从用了它,我腰也不酸了,背也不疼了,编程更有劲儿了。。。”

  使用Parallel.Invoke处理并行逻辑

  跟Parallel.For函数相似,Parallel.Invoke也是Parallel类的一个静态函数,它可以接受一个Action[]类型的对象作为参数,这个对象,就是我们要执行的任务。系统会根据代码运行的硬件环境,主要是CPU运算核心的个数,自动地进行线程的创建和分配。这有些类似于我们所熟悉的多线程开发,通过为每个线程指定一个线程函数而让多个任务同时进行,只是Parallel.Invoke函数简化了线程的创建和分配等繁琐的动作,我们只需要提供核心的线程函数就可以了。下面我们来看一个实际的例子。在上文中,我们介绍了程序员陈良乔起床的例子,在以前的单核时代,他起床大约是这个样子的:

       
//
串行式起床
       
private
static
void
GetUp()
        {
            Start(
"
GetUp
"
);
           
//
先烧水
            boil();
           
//
后锻炼
            exercise();
            End(
"
GetUp
"
);
        }
       
       
//
锻炼
       
private
static
void
exercise()
        {
            Console.WriteLine(
"
Exercise
"
);
            Thread.Sleep(
2000
);
            Console.WriteLine(
"
Finish
Exercise
"
);
        }
       
//
烧水
       
private
static
void
boil()
        {
            Console.WriteLine(
"
Boil
"
);
            Thread.Sleep(
3000
);
            Console.WriteLine(
"
Finish
Boil
"
);
        }
在单核时代,CPU在同一时间只能做一件事情,所以他只能先烧水,后锻炼,这样显然会耽误时间。一天,他又因为这事而迟到了,老板骂道,“你是猪啊,你不会用Parallel.Invoke一边烧水一边锻炼啊?”于是,有了下面的并行式起床:
       
//
并行式起床
       
private
static
void
ParallelGetUp()
        {
            Start(
"
ParallelGetUp
"
);
           
//
在烧水的同时,锻炼身体
            var steps
=
new
Action[] { ()
=>
boil(), ()
=>
exercise()
};
            Parallel.Invoke(steps);
            End(
"
ParallelGetUp
"
);
        }

    通过Parallel.Invoke函数,我们将一些相互独立的任务同时执行,实现了“并行逻辑”,也大大地提高了应用程序的性能和效率。从下面的截图中,我们可以明显地看出两种方式的差别。串行方式所耗费的时间,是两个步骤的时间总和,而并行方式所耗费的时间,大约是单个任务的耗时最长的哪一个。

  图1 串行和并行的执行情况

  对Parallel.Invoke进行控制

  Parallel.Invoke提供了一个重载版本,它可以接受一个ParallelOptions对象作为参数,对Parallel.Invoke的执行进行控制。通过这个对象,我们可以控制并行的最大线程数,各个任务是否取消执行等等。例如,在一个智能化的家中,系统会判断主人是否离开房间,如果主人离开了房间,则自动关闭屋子里的各种电器。利用Parallel.Invoke我们可以实现如下:

public
static
void
PInvokeCancel()
{
//
创建取消对象
CancellationTokenSource cts
=
new
CancellationTokenSource();
//
利用取消对象,创建ParallelOptions
ParallelOptions
pOption
=
new
ParallelOptions() { CancellationToken
=
cts.Token
};
//
设置最大线程数
pOption.MaxDegreeOfParallelism
=
2
;
//
创建一个守护监视进程
Task.Factory.StartNew(()
=>
{
Console.WriteLine(
"
Cancellation in 5
sec.
"
);
Thread.Sleep(
5000
);
//
取消,结束任务的执行
cts.Cancel();
Console.WriteLine(
"
Canceled
requested
"
);
});
try
{
//
以ParallelOptions作为参数,
//
调用Parallel.Invoke
Parallel.Invoke(pOption, ()
=>
ShutdownLights(pOption.CancellationToken),
()
=>
ShutdownComputer(pOption.CancellationToken));
//
输出执行结果
Console.WriteLine(
"
Lights and computer
are tuned off.
"
);
}
catch
(Exception
e)
{
Console.WriteLine(e.Message);
}
}
private
static
void
ShutdownLights(CancellationToken token)
{
while
(
!
token.IsCancellationRequested)
{
Console.WriteLine(
"
Light is on.
"
);
Thread.Sleep(
1000
);
}
}
private
static
void
ShutdownComputer(CancellationToken token)
{
while
(
!
token.IsCancellationRequested)
{
Console.WriteLine(
"
Computer is
on.
"
);
Thread.Sleep(
1000
);
}
}

    

除了这种方式之外,ParallelOptions更多地应用在取消任务队列中还未来得及执行的任务。当我们限制了最大并发线程数的时候,如果需要通过Parallel.Invoke执行的任务较多,则有可能部分任务在队列中排队而得不到及时的执行,如果到了一定的条件这些任务还没有执行,我们可能取消这些任务。一个恰当的现实生活中的例子就是火车站买票。火车站买票的人很多,但是售票的窗口有限,当到了下班时间后,窗口就不再售票了,也就是剩下的售票任务需要取消掉。我们可以用下面的代码来模拟这样一个场景:

public
static
void
PInvokeCancel()
  {
  
//
创建取消对象
  CancellationTokenSource cts
=
new
CancellationTokenSource();
  
//
利用取消对象,创建ParallelOptions
  ParallelOptions pOption
=
new
ParallelOptions()
{ CancellationToken
=
cts.Token };
  
//
设置最大线程数,也就相当于20个售票窗口
  pOption.MaxDegreeOfParallelism
=
20
;
  
//
创建一个守护监视进程
  
//
当到下班时间后就取消剩下的售票活动
  Task.Factory.StartNew(()
=>
  {
  Console.WriteLine(
"
Cancellation in 5
sec.
"
);
  Thread.Sleep(
5000
);
  
//
取消,结束任务的执行
  cts.Cancel();
  Console.WriteLine(
"
Canceled
requested
"
);
  });
  
try
  {
  
//
创建售票活动
  Action[]
CustomerServices
=
CreateCustomerService(
1000
);
  
//
以ParallelOptions作为参数,
  
//
调用Parallel.Invoke
  Parallel.Invoke(pOption,
CustomerServices);
  }
  
catch
(Exception
e)
  {
  
//
当任务取消后,抛出一个异常
  Console.WriteLine(e.Message);
  }
  }
  
//
创建售票的活动
  
static
Action[]
CreateCustomerService(
int
n)
  {
  Action[] result
=
new
Action[n];
  
for
(
int
i
=
0
; i
<
n; i
++
)
  {
  result[i]
=
()
=>
  {
  Console.WriteLine(
"
Customer Service
{0}
"
,
Task.CurrentId);
  
//
模拟售票需要的时间
  Thread.Sleep(
2000
);
  };
  }
  
return
result;
  }

    并行任务之间的同步

  有时候我们在处理并行任务的时候,各个任务之间需要同步,也就是同时执行的并行任务,需要在共同到达某一个状态的后再一共继续执行。我们可以举一个现实生活中的例子。陈良乔,贾玮和单春晖是好朋友,他们相约到电影院看《建国大业》。他们三个住在不同的地方,为了能一起买票进电影院,他们约好先在电影院门口的KFC会合,然后再一起进电影院。这其中就涉及到一个同步的问题:他们需要先在KFC会合。他们是从家里分别到KFC的,但是需要在KFC进行同步,等到三个人都到齐后在完成后后继的动作,进电影院看电影。

  为了完成并行任务之间的同步,.NET

Framework中提供了一个类Barrier。顾名思义,Barrier就像一个关卡或者是剪票口一样,通过Barrier类,我们可以管理并行任务的执行,完成他们之间的同步。Barrier类的使用非常简单,我们只需要在主线程中声明一个Barrier对象,同时指明需要同步的任务数。然后,在需要进行同步的地方调用Barrier类的SignalAndWait函数就可以了。
当一个并行任务到达SignalAndWait后,它会暂停执行,等待所有并行任务都到达同步点之后再继续往下执行。下面我们以一个实际的例子,来看看如何利用Barrier类完成看电影的同步问题。

using System;
  using
System.Collections.Generic;
  using System.Linq;
  using
System.Text;
  using System.Threading;
  using
System.Threading.Tasks;
  namespace
ParallelBarrier
  {
  
class
Program
  {
  
//
用于同步的Barrier对象
  
static
Barrier
sync;
  
static
void
Main(string[] args)
  {
  
//
创建Barrier对象,这里我们需要同步
  
//
任务有三个
  sync
=
new
Barrier(
3
);
  
//
开始执行并行任务
  var steps
=
new
Action[] { ()
=>
gotothecinema(
"
陈良乔
"
, TimeSpan.FromSeconds(
5
) ),
  ()
=>
gotothecinema(
"
贾玮
"
, TimeSpan.FromSeconds(
2
) ),
  ()
=>
gotothecinema(
"
单春晖
"
, TimeSpan.FromSeconds(
4
)
)};
  Parallel.Invoke(steps);
  Console.ReadKey();
  }
  
//
任务
  
static
void
gotothecinema(string strName, TimeSpan timeToKFC
)
  {
  Console.WriteLine(
"
[{0}]
从家里出发。
"
,
strName);
  
//
从家里到KFC
  Thread.Sleep(timeToKFC);
  Console.WriteLine(
"
[{0}]
到达KFC。
"
,
strName);
  
//
等待其他人到达
  sync.SignalAndWait();
  
//
同步后,进行后继动作
  Console.WriteLine(
"
[{0}]
买票进电影院。
"
,
strName);
  }
  }
  }

  在这段代码中,我们首先创建了Barrier对象,因为在这里需要同步的任务有三个,所以创建Barrier对象时是的参数是3。然后就是使用Parallel.Invoke执行并行任务。我们在并行任务gotothecinema中设置了一个同步点,在这里我们调用Barrier对象的SignalAndWait函数,它表示当前任务已经到达同步点并同时等待其他任务到达同步点。当所有任务都到达同步点之后,再继续往下执行。运行上面的程序,我们可以获得这样的输出:

  图2 使用Barrier进行同步

  更复杂的任务之间的同步

  我们在使用Barrier进行并行任务之间的同步时,有这样一个缺陷,我们需要预先知道所有需要同步的并行任务的数目,如果这个数目是随机的,就无法使用Barrier进行任务之间的同步了。并行任务数目不定这种情况很常见。我们还是来看上文中看电影的例子,每场进电影院看电影的观众数目是不固定的,那么退场的观众也是不固定的,甚至还有中途退场的。当所有观众都退场后,我们需要打扫电影院的卫生。这里需要的同步的就是所有观众都退场。针对这种数目不定的多个并行任务,.NET

Framework提供了CountdownEvent这个类来进行任务之间的同步。

  就像它的名字一样,CountdownEvent基于这样一个简单的规则:当有新的需要同步的任务产生时,就调用AddCount增加它的计数,当有任务到达同步点是,就调用Signal函数减小它的计数,当CountdownEvent的计数为零时,就表示所有需要同步的任务已经完成,可以开始下一步任务了。下面我们利用CountdownEvent来模拟一下观众进场立场的情景。

  1
using
System;
  2
  3
  using System.Collections.Generic;
  4
  5
  using
System.Linq;
  6
  7
  using System.Text;
  8
  9
  using
System.Threading;
10
11
  using System.Threading.Tasks;
12
13
  namespace
CountdownEventDemo
14
15
  {
16
17
  
//
观众类,用来表示一位观众
18
19
  
class
Customer
20
21
  {
22
23
  
public
Customer(
int
nID)
24
25
  {
26
27
  m_nID
=
nID;
28
29
  }
30
31
  
//
观众的ID
32
33
  
public
int
m_nID;
34
35
  }
36
37
  
class
Program
38
39
  {
40
41
  
static
void
Main(string[]
args)
42
43
  {
44
45
  
//
创建CountdownEvent同步对象
46
47
  using (var
countdown
=
new
CountdownEvent(
1
))
48
49
  {
50
51
  
//
产生一个随机数,表示观众的数目
52
53
  Random
countRandom
=
new
Random(DateTime.Now.Millisecond);
54
55
  
int
nCount
=
countRandom.Next(
10
);
56
57
  
//
构造每一位观众看电影的任务
58
59
  Action[] seeafilm
=
new
Action[ nCount
];
60
61
  
for
(
int
i
=
0
; i
<
nCount; i
++
)
62
63
  {
64
65
  
//
构造Customer对象,表示观众
66
67
  Customer
currentCustomer
=
new
Customer( i
+
1
);
68
69
  seeafilm[i]
=
()
=>
70
71
  {
72
73
  
//
观众进场
74
75
  countdown.AddCount();
76
77
  Console.WriteLine(
"
观众 {0}
进场。
"
,
currentCustomer.m_nID);
78
79
  
//
模拟看电影的时间
80
81
  Thread.Sleep(countRandom.Next(
3000
,
6000
));
82
83
  
//
观众退场
84
85
  countdown.Signal();
86
87
  Console.WriteLine(
"
观众 {0}
退场。
"
,
currentCustomer.m_nID);
88
89
  };
90
91
  }
92
93
  
//
并行执行任务
94
95
  Parallel.Invoke( seeafilm );
96
97
  
//
在此同步,最后CountdownEvent的计数变为零
98
99
  countdown.Signal();
100
101
  countdown.Wait();
102
103
  }
104
105
  Console.WriteLine(
"
所有观众退场,开始打扫卫生。
"
);
106
107
  Console.ReadKey();
108
109
  }
110
111

    在这段代码中,我们使用CountdownEvent进行随机个数任务之间的同步。最后,我们可以得到这样的输出。

  图3 使用CountdownEvent进行同步

  通过Parallel.Invoke函数,我们可以轻松地将相互独立的任务并行执行,同时通过Barrier和CountdownEvent类进行任务之间的同步。这种并行计算的开发方式,比以前那种基于线程的并行计算开发方式简便很多,解放了程序员的脑袋,让他们可以把更多的脑力放到业务逻辑问题的解决之上。

  使用Parallel类,多快好省地开发并行计算应用程序。

转载地址:http://bflwa.baihongyu.com/

你可能感兴趣的文章
买家与卖家也能战略合作
查看>>
shell删除每行开始的数字
查看>>
前端--CSS
查看>>
DoD模型与OSI模型的关系及其协议对应关系
查看>>
网卡报错:Failed to start LSB: Bring up/down networking
查看>>
MySQL的root密码忘记后重置方法
查看>>
boost read_some函数历程
查看>>
lvm逻辑卷管理
查看>>
CentOS7开机提示:"initial setup of centos linux 7 (core)"
查看>>
加密类型以及相关算法
查看>>
Suse init.d 服务启动脚本写法
查看>>
KVM虚拟化实战精讲[第一章 基础环境]
查看>>
将数据库表转为POJO
查看>>
计算机网络(二)——传输层
查看>>
java:泛型|RandomList
查看>>
iptables 开放所有端口, 对特殊端口只开放给指定IP
查看>>
Xtradb+Haproxy高可用数据库集群(三)sysbench性能测试篇
查看>>
彻底理解Cisco NAT内部的一些事
查看>>
Android官方开发文档Training系列课程中文版:管理Activity的生命周期之Activity的重建...
查看>>
DNS子域授权,acl以及日志系统
查看>>