並列処理・マルチスレッド

要約

  • Thread/Executorでタスクを並行実行。
    ExecutorServiceを基本とし、必要に応じて仮想スレッド(Java 21+)を活用。
  • 共有データは可視性(volatile)と排他(synchronized/Lock原子操作(Atomic*/LongAdderを正しく使い分け。
  • CompletableFutureでノンブロッキングな合成、同期ユーティリティで調整(CountDownLatch/Semaphore/Phaser 等)。
  • デッドロック回避(ロック順序の一貫、tryLock)、並列ストリームの乱用禁止(副作用/IO混在/順序要件)。

1. 基本概念(用語整理)

  • 並行(concurrent):論理的に同時進行。並列(parallel):物理的同時実行(CPUコア)。
  • CPUバウンド:計算中心(並列化で効果)。IOバウンド:待機中心(仮想スレッド有効)。
  • タスクは「やること」、スレッドは「実行の器」。

2. Thread/Runnable/Callable の基本

// 直接Thread(学習用):実務はExecutor推奨
Thread t = new Thread(() -> System.out.println("work on " + Thread.currentThread().getName()));
t.start();
t.join(); // 完了待ち(InterruptedExceptionに備える)
// 戻り値ありはCallable + Future(後述のExecutorで使う)
java.util.concurrent.Callable<Integer> task = () -> 40 + 2;

3. ExecutorService(スレッドプール)とFuture

import java.util.concurrent.*;
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

Future<Integer> f = pool.submit(() -> 40 + 2);
int ans = f.get(2, TimeUnit.SECONDS);   // タイムアウト付取得
pool.shutdown();                         // 正常終了(必要に応じて awaitTermination)
  • 代表的プール:
    • newFixedThreadPool(n):CPUバウンド向け
    • newCachedThreadPool():短命タスク・大量(IO寄り)
    • Java 21+Executors.newVirtualThreadPerTaskExecutor()(仮想スレッド、IO待ちに強い)
// 仮想スレッド(Java 21+)
try (ExecutorService vexec = Executors.newVirtualThreadPerTaskExecutor()) {
    var f1 = vexec.submit(() -> httpGet("https://example.com")); // ブロッキングIOでも軽量
}

4. CompletableFuture(非同期合成)

import java.util.concurrent.*;
import static java.util.concurrent.CompletableFuture.*;

CompletableFuture<Integer> a = supplyAsync(() -> 10);
CompletableFuture<Integer> b = supplyAsync(() -> 32);

CompletableFuture<Integer> sum =
    a.thenCombine(b, Integer::sum)           // 並列→合成
     .thenApply(x -> x * 2)                  // 変換
     .exceptionally(e -> 0);                 // 失敗時のフォールバック

int r = sum.join(); // 42 * 2 = 84
  • 合流allOf(全完了待ち)、anyOf(いずれか)。
  • チェーンthenApply(同期変換)、thenCompose(非同期フラット化)。
  • デフォルトは共通ForkJoinPool。専用プールを使う場合はxxxAsync(..., executor)

5. 共有データとメモリ可視性

  • volatile:可視性のみ(再順序化の抑制)。複合操作の原子性は保証しない
  • synchronized:相互排他+可視性(モニタロック、入退出でメモリ同期)。
  • LockReentrantLock:待ち合わせ制御(tryLock/公平性/条件変数)。
  • 原子変数AtomicInteger/AtomicReference、高競合なら**LongAdder**。
import java.util.concurrent.atomic.*;

// 競合下でも正確:AtomicInteger
AtomicInteger ai = new AtomicInteger(0);
ai.incrementAndGet();

// 高スループットなカウンタ:LongAdder
java.util.concurrent.LongAdder adder = new java.util.concurrent.LongAdder();
adder.increment();
long total = adder.sum();

6. 同期ユーティリティ

import java.util.concurrent.*;

// CountDownLatch:N個の事前作業完了を待つ
CountDownLatch latch = new CountDownLatch(3);
// ...各タスクで latch.countDown();
latch.await(); // 3回カウントダウンされるまで待つ

// Semaphore:同時実行数の制限(例:DB接続)
Semaphore sem = new Semaphore(10);
sem.acquire();
try { /* リソース使用 */ } finally { sem.release(); }

// Phaser / CyclicBarrier:段階同期、バリア同期

7. デッドロック回避とtryLock

import java.util.concurrent.locks.*;
Lock A = new ReentrantLock();
Lock B = new ReentrantLock();

boolean ok = false;
try {
    // 同一順序で取得(A→B)を全スレッドで徹底、またはtryLockで回避
    if (A.tryLock(100, java.util.concurrent.TimeUnit.MILLISECONDS)) {
        try {
            if (B.tryLock(100, java.util.concurrent.TimeUnit.MILLISECONDS)) {
                try { /* クリティカルセクション */ ok = true; }
                finally { B.unlock(); }
            }
        } finally { A.unlock(); }
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}
if (!ok) { /* リトライやフォールバック */ }
  • 原則:ロック順序を全コードで一貫tryLock待ち合わせに上限

8. 並列ストリームの注意点

var list = java.util.List.of(1,2,3,4,5);
int s = list.parallelStream().mapToInt(Integer::intValue).sum(); // 計算はOK
  • 副作用・IO混在・順序依存がある処理は並列ストリーム禁止
  • 並列度は共通ForkJoinPool依存。サーバ内で乱用すると別処理に影響。
  • 明示制御が必要ならExecutor/CompletableFutureで書く。

9. 典型的な競合バグと対処

// 競合(NG例)
class CounterBad {
    private int n = 0;
    void inc() { n++; }    // 非原子(読み→加算→書き込みが分割)
    int get() { return n; }
}

// 対処例1:synchronized
class CounterSync {
    private int n = 0;
    synchronized void inc(){ n++; }
    synchronized int get(){ return n; }
}

// 対処例2:AtomicInteger
class CounterAtomic {
    private final java.util.concurrent.atomic.AtomicInteger n = new java.util.concurrent.atomic.AtomicInteger();
    void inc(){ n.incrementAndGet(); }
    int get(){ return n.get(); }
}
  • 読み→判定→書き」の複合はロック/原子更新が必須。

10. 実用サンプル(1ファイル:プール・合成・同期・原子)

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;

public class ConcurrencyDemo {

    // 疑似IO:遅延して値を返す
    static String fetch(String id) {
        try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        return "item-" + id;
    }

    public static void main(String[] args) throws Exception {
        // 1) スレッドプール(Java 21+は仮想スレッド推奨)
        ExecutorService pool = Executors.newFixedThreadPool(Math.max(2, Runtime.getRuntime().availableProcessors() - 1));
        // ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor(); // Java 21+ ならこちら

        // 2) CompletableFutureで並列フェッチ+合成
        List<String> ids = List.of("A","B","C","D","E");
        List<CompletableFuture<String>> futures = ids.stream()
                .map(id -> CompletableFuture.supplyAsync(() -> fetch(id), pool))
                .toList();

        CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        List<String> results = all.thenApply(v -> futures.stream().map(CompletableFuture::join).toList())
                                  .get(2, TimeUnit.SECONDS);
        System.out.println(results); // [item-A, item-B, ...]

        // 3) 頻度集計:ConcurrentHashMap + LongAdder
        ConcurrentHashMap<String, LongAdder> freq = new ConcurrentHashMap<>();
        results.parallelStream().forEach(s ->
            freq.computeIfAbsent(s, k -> new LongAdder()).increment()
        );
        System.out.println(freq); // {item-A=1, ...}

        // 4) Latchで段取り同期
        CountDownLatch latch = new CountDownLatch(2);
        pool.submit(() -> { heavyCalc(); latch.countDown(); return null; });
        pool.submit(() -> { heavyCalc(); latch.countDown(); return null; });
        latch.await(1, TimeUnit.SECONDS); // 2タスクの完了を待つ(時間上限つき)

        // 5) タイムアウト付きFuture
        Future<Integer> f = pool.submit(() -> 40 + 2);
        System.out.println(f.get(1, TimeUnit.SECONDS)); // 42

        pool.shutdown();
        pool.awaitTermination(2, TimeUnit.SECONDS);
    }

    static void heavyCalc() {
        // CPUバウンド疑似処理
        long x = 0;
        for (int i = 0; i < 5_0000; i++) x += i;
    }
}
  • 要点:Executorで実行、CompletableFutureで合成、ConcurrentHashMap + LongAdderで安全な集計、Latchで同期。

ベストプラクティス要点

  • ExecutorService基準で実装し、Thread直書きは避ける。Java 21+では仮想スレッドを第一候補に。
  • 共有データは不変化スレッド封じ込め(Thread confinement)メッセージパッシングで設計。
  • 排他は最小スコープロック順序の一貫try/finally確実に解放
  • 競合激しいカウンタは**LongAdder、マップはConcurrentHashMap**。
  • タイムアウトとキャンセルFuture.cancel/orTimeout)を必ず設け、ハングを防止。
  • 並列ストリーム乱用禁止。副作用・IO・順序依存があるならCF/Executorで明示制御。