注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

民主与科学

独立之人格,自由之思想

 
 
 

日志

 
 

CyclicBarrier  

2010-09-30 15:01:50|  分类: JAVA线程安全 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
java.util.concurrent.CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有到达,然后他们才可以跨越屏障点 (common barrier point)。
在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。
因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的Runnable 对象参数,在一组线程中的最后一个线程到达之后执行它(但在释放所有线程之前),
该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。 
注意1:CyclicBarrier支持一个可选的Runnable,在一组线程中的最后一个线程到达之后(但在释放所有线程之前)执行Runnable
示例用法:下面是一个在并行分解设计中使用 barrier 的例子:
 class Solver {
   
final int N;
   
final float[][] data;
   
final CyclicBarrier barrier;

   
class Worker implements Runnable {
     
int myRow;
     
Worker(int row) { myRow = row; }
     
public void run() {
       
while (!done()) {
         processRow
(myRow);

         
try {
           barrier
.await();
         
} catch (InterruptedException ex) {
           
return;
         
} catch (BrokenBarrierException ex) {
           
return;
         
}
       
}
     
}
   
}

   
public Solver(float[][] matrix) {
     data
= matrix;
     N
= matrix.length;
     barrier
= new CyclicBarrier(N,
                                 
new Runnable() {
                                   
public void run() {
                                     mergeRows
(...);
                                   
}
                                 
});
     
for (int i = 0; i < N; ++i)
       
new Thread(new Worker(i)).start();

     waitUntilDone
();
   
}
 
}
在这个例子中,每个 worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。
处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。
如果合并者确定已经找到了一个解决方案,那么 done() 将返回 true,所有的 worker 线程都将终止。
注意1:这个例子感觉不怎么好。done()和waitUntilDone()对阅读代码干扰很大。
  如果屏障操作在执行时不依赖于正挂起的线程,则线程组中的任何线程在获得释放时都能执行该操作。
为方便此操作,每次调用 await() 都将返回能到达屏障处的线程的序列号。然后,您可以选择哪个线程应该执行屏障操作,例如:
if (barrier.await() == 0) {
     
// log the completion of this iteration
   
}
 注意barrier.await() == barrier.getParties() - 1 表示是第一个到达的线程,barrier.await() == 0表示这是最后一个到达的线程,当然此时它已经跨过了屏障 至于线程是第几个跨越障碍的线程,这个不清楚,应该和线程的调度有关。
  可以参照实例1
  对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:
如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,
那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
注意:如果因为中断、失败或者超时等原因,导致线程跨过了屏障点,那么其他线程将过BrokenBarrierException的方式跨过。(如果它们几乎同时被中断,则用 InterruptedException)
Memory consistency effects: Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.
await()一定比在它之前的代码后执行,await()一定比它之后的代码先被执行;await()之前的代码一定比Runnable barrierAction先被执行,Runnable barrierAction一定比await()之后的代码先被执行
主要方法
public CyclicBarrier(int parties,
                     Runnable barrierAction)
    创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
    参数:
        parties - 在启动 barrier 前必须调用 await() 的线程数
        barrierAction - 在启动 barrier 时执行的命令;如果不执行任何操作,则该参数为 null 
    抛出:
        IllegalArgumentException - 如果 parties 小于 1
public CyclicBarrier(int parties,
                     Runnable barrierAction)
    创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
    参数:
        parties - 在启动 barrier 前必须调用 await() 的线程数
        barrierAction - 在启动 barrier 时执行的命令;如果不执行任何操作,则该参数为 null 
    抛出:
        IllegalArgumentException - 如果 parties 小于 1
public int getParties()
    返回要求启动此 barrier 的参与者数目。
    返回:
        要求启动此 barrier 的参与者数目
public int await()
          throws InterruptedException,
                 BrokenBarrierException
    在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
    如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:
        * 最后一个线程到达;或者
        * 其他某个线程中断当前线程;或者
        * 其他某个线程中断另一个等待线程;或者
        * 其他某个线程在等待 barrier 时超时;或者
        * 其他某个线程在此 barrier 上调用 reset()。 
    如果当前线程:
        * 在进入此方法时已经设置了该线程的中断状态;或者
        * 在等待时被中断 
    则抛出 InterruptedException,并且清除当前线程的已中断状态。
    如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,
    则抛出 BrokenBarrierException 异常。
    如果任何线程在等待时被中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。
    如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,
    当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
    返回:
        到达的当前线程的序列号,其中,getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程 
    抛出:
        InterruptedException - 如果当前线程在等待时被中断 
        BrokenBarrierException - 如果另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,
        或者在调用 await 时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。
注意1:返回是"到达的当前线程的序列号",但是本质表示在当前线程到达时,还需要await几个线程。
barrier.await() == barrier.getParties() - 1 表示是第一个到达的线程,barrier.await() == 0表示这是最后一个到达的线程,当然此时它已经跨过了屏障 至于线程是第几个跨越障碍的线程,这个不清楚,应该和线程的调度有关。
注意2:"如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它。"这里的禁用应该是指不分配时间片段给它吧。
 public int await(long timeout,
                 TimeUnit unit)
          throws InterruptedException,
                 BrokenBarrierException,
                 TimeoutException
    在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
    如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:
        * 最后一个线程到达;或者
        * 超出指定的超时时间;或者
        * 其他某个线程中断当前线程;或者
        * 其他某个线程中断另一个等待线程;或者
        * 其他某个线程在等待 barrier 时超时;或者
        * 其他某个线程在此 barrier 上调用 reset()。 
    如果当前线程:
        * 在进入此方法时已经设置了该线程的中断状态;或者
        * 在等待时被中断 
    则抛出 InterruptedException,并且清除当前线程的已中断状态。
    如果超出指定的等待时间,则抛出 TimeoutException 异常。如果该时间小于等于零,则此方法根本不会等待。
    如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,
    抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。
    如果任何线程在等待时被中断,则其他所有等待线程都将抛出 BrokenBarrierException,并将屏障置于损坏状态。
    如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,
    则在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,
    则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
    参数:
        timeout - 等待 barrier 的时间
        unit - 超时参数的时间单位 
    返回:
        到达的当前线程的索引,其中,索引 getParties() - 1 指示第一个将要到达的线程,零指示最后一个到达的线程 
    抛出:
        InterruptedException - 如果当前线程在等待时被中断 
        TimeoutException - 如果超出了指定的超时时间 
        BrokenBarrierException - 如果另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,
        或者调用 await 时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。
public boolean isBroken()
    查询此屏障是否处于损坏状态。
    返回:
        如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此 barrier,
        或者因为异常而导致某个屏障操作失败,则返回 true;否则返回 false。
public void reset()
    将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,
    同时抛出一个 BrokenBarrierException。
    注意,在由于其他原因造成损坏之后,实行重置可能会变得很复杂;此时需要使用其他方式重新同步线程,并选择其中一个线程来执行重置。
    与为后续使用创建一个新 barrier 相比,这种方法可能更好一些。 
public int getNumberWaiting()
    返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。
    返回:
        当前阻塞在 await() 中的参与者数目。
实例1
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
    
CyclicBarrierDemo()
    {
        int number=5;
        CyclicBarrier barrier=new CyclicBarrier(number);
       
 try{
            for(int i=0;i<number;i++)
            {
                new CyclicBarrierDemoThread("thread"+i,barrier).start();
                Thread.sleep(100);
            }
        }catch(InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
class CyclicBarrierDemoThread extends Thread
{
    private CyclicBarrier barrier;
    CyclicBarrierDemoThread(String name,CyclicBarrier barrier)
    {
        super(name);
        this.barrier=barrier;
    }
    public void run()
    {
        try{
        System.out.println(getName()+" has arrived");
        int val=barrier.await();
        System.out.println(getName()+" has crossd the barrier "+val);
        }catch(InterruptedException e)
        {
            e.printStackTrace();
        }
        catch(BrokenBarrierException e)
        {
            e.printStackTrace();
        }
    }
}
运行结果
thread0 has arrived
thread1 has arrived
thread2 has arrived
thread3 has arrived
thread4 has arrived
thread0 has crossd the barrier 4
thread1 has crossd the barrier 3
thread4 has crossd the barrier 0
thread3 has crossd the barrier 1


  评论这张
 
阅读(1473)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017