Java並行処理 Phaserのかんたんな使い方 [修正版]

masakiが2015/09/30 18:33:33に投稿

Phaserのかんたんな使い方 [修正版]

はじめに

3つ以上のスレッドの待ち合わせをする必要があったので、Javaではどうやるのが正しいのか調べた所、

  • 同期化バリアを使う。
  • 最近ではPhaserがおすすめ。

とあったのでPhaserを使うことにした。
ドキュメントを読んでなんとなく使い方がわかった(正確に詳細な部分までは調べていない)のでここにまとめる。

Phaserとは

Java 7で追加された同期化バリアである。 ( java.util.concurrent.Phaser )
Javaで用意されている同期化バリアは他に CyclicBarrier および CountDownLatch がある。(それらとの対比はここではしない。そのうちいつかする。たぶん。)

複数のスレッドの動作を待ち合わせる(バリアする)ことを目的とする。
複数のスレッドの実行状態を「カウント」で管理する。

「phase」、「バリア」の単語の意味を辞書で確認しておく。

  • phase : 段階; 時期
  • バリア(barrier) : 視界や進入を防ぐことによって分離を維持するのに役立つ何か

余談だがWin32ではWaitForMultipleObjects()を使うのが一般的である。
(カウントでの管理は行わない。オブジェクトを直に登録し、登録したすべてのオブジェクトがシグナル状態になると待機終了となる。)

主なメソッド

Phaser#Phaser()
Phaserインスタンスを作成する。
インスタンスを複数の「パーティ(party)」を持つことができる。
引数が空の場合はパーティの数は0。

1以上の数値を指定した場合はその数だけのパーティを持つ。

パーティは「未到着」か「到着」かの状態を持つ。
初期状態では「未到着」の状態を持つ。

Phaser#register()
該当インスタンスへパーティを登録する。
複数回呼び出すことによって複数登録することができる。
該当インスタンスのパーティ登録個数が増える。

Phaser#arrive()
該当インスタンスのうち一つのパーティが「到着」したことを伝える。
これにより該当インスタンスのうち一つのパーティが到着状態となる。

Phaser#arriveAndDeregister()
該当インスタンスのうち一つのパーティが「到着」したことを伝えて、該当パーティを登録解除する。
これにより該当インスタンスのパーティ登録個数が減る。

パーティをあとでもう一度再利用したい場合はarrive()を使う。

Phaser#awaitAdvance()
該当インスタンスの未到着パーティすべての到着を待つ。

Phaser#arriveAndAwaitAdvance()
該当インスタンスのうち一つのパーティが「到着」したことを伝えて、他の未到着パーティすべての「到着」を待つ。

実際に使ってみる

サンプルソース

package jp.co.linkode.phaser_sample;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;

public class App 
{
    public static void main( String[] args )
    {
    	ExecutorService exec = Executors.newSingleThreadExecutor();
    	
    	System.out.println("phaser作成");
    	Phaser phaser = new Phaser();printCounter(phaser);
    	
    	System.out.println("3回計算を行うのでパーティを3つ増やす");
    	phaser.register(); printCounter(phaser); // 計算A用
    	phaser.register(); printCounter(phaser); // 計算B用
    	phaser.register(); printCounter(phaser); // 計算C用
    	System.out.println("計算待機用にもう1つパーティを増やす");
    	phaser.register(); printCounter(phaser); // すべての計算の終了待機用
    	
    	// 計算開始
    	exec.submit(new Calc("計算A", phaser));
    	exec.submit(new Calc("計算B", phaser));
    	exec.submit(new Calc("計算C", phaser));
    	
    	// 計算が終わるまで待つ
    	System.out.println("4つのパーティのうち1つが到着状態となる。ただし、登録解除されずに残ったまま。");
    	int phaseNo = phaser.arrive();
    	System.out.println("他の3つのパーティすべてが到着状態となるのを待つ。 ");
    	printCounter(phaser);
    	phaser.awaitAdvance(phaseNo);
    	
        System.out.println("すべての計算終了");
        printCounter(phaser); // すべてのパーティが到着状態になると、すべてのパーティが未到着状態になる。
        
        exec.shutdown();
    }
    
    public static void printCounter(Phaser phaser) {
    	System.out.println(
    			"RegisteredParties:"+phaser.getRegisteredParties() + 
    			" ArrivedParties:"+phaser.getArrivedParties() + 
    			" UnarrivedParties:"+phaser.getUnarrivedParties());
    }
    
    public static class Calc implements Callable<Integer> {
    	private Phaser phaser = null;
    	private String name = "";
    	private boolean PhaserIsDeregistered = true;
    	
    	public Calc(String name, Phaser phaser) {
    		this.phaser = phaser;
    		this.name = name;
    	}
		public Integer call() throws Exception {
			System.out.println(name + " 計算開始.");
			
			// 時間のかかる計算
			Thread.sleep(new Random().nextInt(5) * 2000);
			int result = 1 + 1;
			
			// 計算が終わったので1つ減らし終了を知らせる
			if (PhaserIsDeregistered) { // 
				phaser.arriveAndDeregister();
				System.out.println(name + " 計算終了 4つのパーティのうち1つが到着状態となる。その後、登録解除される。");
			} else {
				phaser.arrive();
				System.out.println(name + " 計算終了  4つのパーティのうち1つが到着状態となる。ただし、登録解除されずに残ったまま。");
			}
			App.printCounter(phaser);
			return result;
		}
    }
}

計算終了時にパーティを登録解除しない場合

phaser作成
RegisteredParties:0 ArrivedParties:0 UnarrivedParties:0
3回計算を行うのでパーティを3つ増やす
RegisteredParties:1 ArrivedParties:0 UnarrivedParties:1
RegisteredParties:2 ArrivedParties:0 UnarrivedParties:2
RegisteredParties:3 ArrivedParties:0 UnarrivedParties:3
計算待機用にもう1つパーティを増やす
RegisteredParties:4 ArrivedParties:0 UnarrivedParties:4
4つのパーティのうち1つが到着状態となる。ただし、登録解除されずに残ったまま。
他の3つのパーティすべてが到着状態となるのを待つ。 
RegisteredParties:4 ArrivedParties:1 UnarrivedParties:3
計算A 計算開始.
計算A 計算終了  4つのパーティのうち1つが到着状態となる。ただし、登録解除されずに残ったまま。
RegisteredParties:4 ArrivedParties:2 UnarrivedParties:2
計算B 計算開始.
計算B 計算終了  4つのパーティのうち1つが到着状態となる。ただし、登録解除されずに残ったまま。
RegisteredParties:4 ArrivedParties:3 UnarrivedParties:1
計算C 計算開始.
計算C 計算終了  4つのパーティのうち1つが到着状態となる。ただし、登録解除されずに残ったまま。
すべての計算終了
RegisteredParties:4 ArrivedParties:0 UnarrivedParties:4
RegisteredParties:4 ArrivedParties:0 UnarrivedParties:4

計算終了時にパーティを登録解除する場合

phaser作成
RegisteredParties:0 ArrivedParties:0 UnarrivedParties:0
3回計算を行うのでパーティを3つ増やす
RegisteredParties:1 ArrivedParties:0 UnarrivedParties:1
RegisteredParties:2 ArrivedParties:0 UnarrivedParties:2
RegisteredParties:3 ArrivedParties:0 UnarrivedParties:3
計算待機用にもう1つパーティを増やす
RegisteredParties:4 ArrivedParties:0 UnarrivedParties:4
4つのパーティのうち1つが到着状態となる。ただし、登録解除されずに残ったまま。
他の3つのパーティすべてが到着状態となるのを待つ。 
RegisteredParties:4 ArrivedParties:1 UnarrivedParties:3
計算A 計算開始.
計算A 計算終了 4つのパーティのうち1つが到着状態となる。その後、登録解除される。
RegisteredParties:3 ArrivedParties:1 UnarrivedParties:2
計算B 計算開始.
計算B 計算終了 4つのパーティのうち1つが到着状態となる。その後、登録解除される。
RegisteredParties:2 ArrivedParties:1 UnarrivedParties:1
計算C 計算開始.
計算C 計算終了 4つのパーティのうち1つが到着状態となる。その後、登録解除される。
すべての計算終了
RegisteredParties:1 ArrivedParties:0 UnarrivedParties:1
RegisteredParties:1 ArrivedParties:0 UnarrivedParties:1