目次
要約
- Thread/Executorでタスクを並行実行。
ExecutorServiceを基本とし、必要に応じて仮想スレッド(Java 21+)を活用。 - 共有データは可視性(
volatile
)と排他(synchronized
/Lock
)、原子操作(Atomic*
/LongAdder
)を正しく使い分け。 - CompletableFutureでノンブロッキングな合成、同期ユーティリティで調整(
CountDownLatch
/Semaphore
/Phaser
等)。 - デッドロック回避(ロック順序の一貫、
tryLock
)、並列ストリームの乱用禁止(副作用/IO混在/順序要件)。
1. 基本概念(用語整理)
- 並行(concurrent):論理的に同時進行。並列(parallel):物理的同時実行(CPUコア)。
- CPUバウンド:計算中心(並列化で効果)。IOバウンド:待機中心(仮想スレッド有効)。
- タスクは「やること」、スレッドは「実行の器」。
2. Thread/Runnable/Callable の基本
// 直接Thread(学習用):実務はExecutor推奨
Thread t = new Thread(() -> System.out.println("work on " + Thread.currentThread().getName()));
t.start();
t.join(); // 完了待ち(InterruptedExceptionに備える)
// 戻り値ありはCallable + Future(後述のExecutorで使う)
java.util.concurrent.Callable<Integer> task = () -> 40 + 2;
3. ExecutorService(スレッドプール)とFuture
import java.util.concurrent.*;
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Future<Integer> f = pool.submit(() -> 40 + 2);
int ans = f.get(2, TimeUnit.SECONDS); // タイムアウト付取得
pool.shutdown(); // 正常終了(必要に応じて awaitTermination)
- 代表的プール:
newFixedThreadPool(n)
:CPUバウンド向けnewCachedThreadPool()
:短命タスク・大量(IO寄り)- Java 21+:
Executors.newVirtualThreadPerTaskExecutor()
(仮想スレッド、IO待ちに強い)
// 仮想スレッド(Java 21+)
try (ExecutorService vexec = Executors.newVirtualThreadPerTaskExecutor()) {
var f1 = vexec.submit(() -> httpGet("https://example.com")); // ブロッキングIOでも軽量
}
4. CompletableFuture(非同期合成)
import java.util.concurrent.*;
import static java.util.concurrent.CompletableFuture.*;
CompletableFuture<Integer> a = supplyAsync(() -> 10);
CompletableFuture<Integer> b = supplyAsync(() -> 32);
CompletableFuture<Integer> sum =
a.thenCombine(b, Integer::sum) // 並列→合成
.thenApply(x -> x * 2) // 変換
.exceptionally(e -> 0); // 失敗時のフォールバック
int r = sum.join(); // 42 * 2 = 84
- 合流:
allOf
(全完了待ち)、anyOf
(いずれか)。 - チェーン:
thenApply
(同期変換)、thenCompose
(非同期フラット化)。 - デフォルトは共通ForkJoinPool。専用プールを使う場合は
xxxAsync(..., executor)
。
5. 共有データとメモリ可視性
volatile
:可視性のみ(再順序化の抑制)。複合操作の原子性は保証しない。synchronized
:相互排他+可視性(モニタロック、入退出でメモリ同期)。Lock
(ReentrantLock
):待ち合わせ制御(tryLock
/公平性/条件変数)。- 原子変数:
AtomicInteger
/AtomicReference
、高競合なら**LongAdder
**。
import java.util.concurrent.atomic.*;
// 競合下でも正確:AtomicInteger
AtomicInteger ai = new AtomicInteger(0);
ai.incrementAndGet();
// 高スループットなカウンタ:LongAdder
java.util.concurrent.LongAdder adder = new java.util.concurrent.LongAdder();
adder.increment();
long total = adder.sum();
6. 同期ユーティリティ
import java.util.concurrent.*;
// CountDownLatch:N個の事前作業完了を待つ
CountDownLatch latch = new CountDownLatch(3);
// ...各タスクで latch.countDown();
latch.await(); // 3回カウントダウンされるまで待つ
// Semaphore:同時実行数の制限(例:DB接続)
Semaphore sem = new Semaphore(10);
sem.acquire();
try { /* リソース使用 */ } finally { sem.release(); }
// Phaser / CyclicBarrier:段階同期、バリア同期
7. デッドロック回避とtryLock
import java.util.concurrent.locks.*;
Lock A = new ReentrantLock();
Lock B = new ReentrantLock();
boolean ok = false;
try {
// 同一順序で取得(A→B)を全スレッドで徹底、またはtryLockで回避
if (A.tryLock(100, java.util.concurrent.TimeUnit.MILLISECONDS)) {
try {
if (B.tryLock(100, java.util.concurrent.TimeUnit.MILLISECONDS)) {
try { /* クリティカルセクション */ ok = true; }
finally { B.unlock(); }
}
} finally { A.unlock(); }
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!ok) { /* リトライやフォールバック */ }
- 原則:ロック順序を全コードで一貫、
tryLock
で待ち合わせに上限。
8. 並列ストリームの注意点
var list = java.util.List.of(1,2,3,4,5);
int s = list.parallelStream().mapToInt(Integer::intValue).sum(); // 計算はOK
- 副作用・IO混在・順序依存がある処理は並列ストリーム禁止。
- 並列度は共通
ForkJoinPool
依存。サーバ内で乱用すると別処理に影響。 - 明示制御が必要ならExecutor/CompletableFutureで書く。
9. 典型的な競合バグと対処
// 競合(NG例)
class CounterBad {
private int n = 0;
void inc() { n++; } // 非原子(読み→加算→書き込みが分割)
int get() { return n; }
}
// 対処例1:synchronized
class CounterSync {
private int n = 0;
synchronized void inc(){ n++; }
synchronized int get(){ return n; }
}
// 対処例2:AtomicInteger
class CounterAtomic {
private final java.util.concurrent.atomic.AtomicInteger n = new java.util.concurrent.atomic.AtomicInteger();
void inc(){ n.incrementAndGet(); }
int get(){ return n.get(); }
}
- 「読み→判定→書き」の複合はロック/原子更新が必須。
10. 実用サンプル(1ファイル:プール・合成・同期・原子)
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
public class ConcurrencyDemo {
// 疑似IO:遅延して値を返す
static String fetch(String id) {
try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "item-" + id;
}
public static void main(String[] args) throws Exception {
// 1) スレッドプール(Java 21+は仮想スレッド推奨)
ExecutorService pool = Executors.newFixedThreadPool(Math.max(2, Runtime.getRuntime().availableProcessors() - 1));
// ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor(); // Java 21+ ならこちら
// 2) CompletableFutureで並列フェッチ+合成
List<String> ids = List.of("A","B","C","D","E");
List<CompletableFuture<String>> futures = ids.stream()
.map(id -> CompletableFuture.supplyAsync(() -> fetch(id), pool))
.toList();
CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
List<String> results = all.thenApply(v -> futures.stream().map(CompletableFuture::join).toList())
.get(2, TimeUnit.SECONDS);
System.out.println(results); // [item-A, item-B, ...]
// 3) 頻度集計:ConcurrentHashMap + LongAdder
ConcurrentHashMap<String, LongAdder> freq = new ConcurrentHashMap<>();
results.parallelStream().forEach(s ->
freq.computeIfAbsent(s, k -> new LongAdder()).increment()
);
System.out.println(freq); // {item-A=1, ...}
// 4) Latchで段取り同期
CountDownLatch latch = new CountDownLatch(2);
pool.submit(() -> { heavyCalc(); latch.countDown(); return null; });
pool.submit(() -> { heavyCalc(); latch.countDown(); return null; });
latch.await(1, TimeUnit.SECONDS); // 2タスクの完了を待つ(時間上限つき)
// 5) タイムアウト付きFuture
Future<Integer> f = pool.submit(() -> 40 + 2);
System.out.println(f.get(1, TimeUnit.SECONDS)); // 42
pool.shutdown();
pool.awaitTermination(2, TimeUnit.SECONDS);
}
static void heavyCalc() {
// CPUバウンド疑似処理
long x = 0;
for (int i = 0; i < 5_0000; i++) x += i;
}
}
- 要点:Executorで実行、CompletableFutureで合成、ConcurrentHashMap + LongAdderで安全な集計、Latchで同期。
ベストプラクティス要点
- ExecutorService基準で実装し、
Thread
直書きは避ける。Java 21+では仮想スレッドを第一候補に。 - 共有データは不変化・スレッド封じ込め(Thread confinement)・メッセージパッシングで設計。
- 排他は最小スコープ、ロック順序の一貫、
try/finally
で確実に解放。 - 競合激しいカウンタは**
LongAdder
、マップはConcurrentHashMap
**。 - タイムアウトとキャンセル(
Future.cancel
/orTimeout
)を必ず設け、ハングを防止。 - 並列ストリーム乱用禁止。副作用・IO・順序依存があるならCF/Executorで明示制御。