【Java学習|初心者向け】Java Parallel Streams で処理を高速化!でも注意点も忘れずに!

皆さん、こんにちは!Javaエンジニアの〇〇です。
今回は、Java 8から導入された「Parallel Streams(並列ストリーム)」について、初心者の方にも分かりやすく解説していきます。

1. Parallel Streams って何? なぜ重要?

普段、Javaでリストなどのコレクションを処理する際、一つずつ順番に処理していく「逐次処理」が一般的です。
しかし、データ量が多かったり、各処理に時間がかかる場合、この逐次処理だと全体の処理時間が長くなってしまいます。

そこで登場するのが「Parallel Streams」です!

Parallel Streams を使うと、複数のCPUコアを最大限に活用して、コレクションの要素を並列に処理することができます。これにより、処理速度を劇的に向上させることが期待できます。

例えば、大量のデータを集計したり、複雑な計算を各要素に対して行ったりする場合に、その効果を発揮します。

2. Parallel Streams の基礎知識:ストリームって? 並列って?

ストリーム (Stream) とは?

まず、Java 8で導入された「ストリームAPI」について軽く触れておきましょう。
ストリームは、コレクション(List, Set, Mapなど)の要素に対して、宣言的な方法で一連の操作(フィルタリング、マッピング、集計など)を行うための機能です。

従来のfor文などを使った処理と比べて、コードが簡潔で読みやすくなるというメリットがあります。

List names = Arrays.asList(“Alice”, “Bob”, “Charlie”);

// 従来のfor文
for (String name : names) {
System.out.println(name.toUpperCase());
}

// ストリームAPIを使った場合
names.stream()
.map(String::toUpperCase)
.forEach(System.out::println);

並列処理 (Parallel Processing) とは?

並列処理とは、複数のタスクを同時に実行することです。
Parallel Streams では、Javaの「Fork/Joinフレームワーク」という仕組みを利用して、これを実現しています。

簡単に言うと、大きな処理を小さなタスクに分割し、それらを複数のCPUコアで同時に実行し、最後に結果を統合する、というイメージです。

3. Parallel Streams の使い方:実装方法

Parallel Streams を使うのは、実はとても簡単です。
`stream()` の代わりに `parallelStream()` を使うだけです!

例えば、先ほどの `names` リストの例を並列処理にしてみましょう。

import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {

public static void main(String[] args) {
List names = Arrays.asList(“Alice”, “Bob”, “Charlie”, “David”, “Eve”, “Frank”, “Grace”, “Heidi”);

System.out.println(“— 並列処理 —“);
names.parallelStream()
.map(name -> {
// 処理に時間がかかることをシミュレート
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + “: ” + name.toUpperCase());
return name.toUpperCase();
})
.forEach(System.out::println); // forEachは結果を待たずに終了することがあります
}
}

このコードを実行すると、`map` 処理の中で、どのスレッド(`Thread.currentThread().getName()`)が実行しているかが出力されます。
逐次処理では常に同じスレッド名が表示されますが、並列処理では複数のスレッド名が表示され、処理が並行して行われていることが分かります。

4. サンプルプログラム:実用的な例

ここでは、数値リストの合計値を計算する例を挙げます。
データ量が多いほど、並列処理の効果が分かりやすくなります。

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class SumParallelStream {

public static void main(String[] args) {
// 1から1,000,000までの数値リストを作成
List numbers = IntStream.rangeClosed(1, 1_000_000)
.boxed() // intをIntegerに変換
.collect(Collectors.toList());

long startTime, endTime;

// 逐次処理での合計値計算
startTime = System.currentTimeMillis();
long sequentialSum = numbers.stream()
.mapToLong(Integer::longValue) // longに変換してオーバーフローを防ぐ
.sum();
endTime = System.currentTimeMillis();
System.out.println(“逐次処理 合計: ” + sequentialSum + “, 時間: ” + (endTime – startTime) + ” ms”);

// 並列処理での合計値計算
startTime = System.currentTimeMillis();
long parallelSum = numbers.parallelStream()
.mapToLong(Integer::longValue) // longに変換してオーバーフローを防ぐ
.sum();
endTime = System.currentTimeMillis();
System.out.println(“並列処理 合計: ” + parallelSum + “, 時間: ” + (endTime – startTime) + ” ms”);
}
}

このサンプルを実行すると、多くの場合、並列処理の方が短時間で合計値を計算できることが確認できます。

5. 応用・注意点:Parallel Streams の落とし穴

Parallel Streams は強力ですが、万能ではありません。
いくつか注意すべき点があります。

5.1. 全ての処理が速くなるわけではない

  • データ量が少ない場合: 処理を分割・統合するオーバーヘッドの方が大きくなり、かえって遅くなることがあります。
  • 各要素の処理が非常に軽い場合: CPUコアを使い切る前に処理が終わってしまうため、効果が出にくいです。
  • 処理に依存関係がある場合: ある要素の処理結果を別の要素の処理で使う場合など、並列に実行できない処理では逐次処理を使うべきです。

5.2. スレッドセーフティに注意!

Parallel Streams は複数のスレッドで同時に処理を行います。
そのため、共有されるリソース(変数やコレクション)へのアクセスは、スレッドセーフである必要があります。

例えば、以下のようなコードは問題が発生する可能性があります。

// 誤った例:スレッドセーフではない
List numbers = Arrays.asList(1, 2, 3, 4, 5);
List resultList = new ArrayList<>();

numbers.parallelStream().forEach(n -> {
// 複数のスレッドが同時にresultListに追加しようとする!
resultList.add(n 2);
});
// この後、resultListのサイズが期待通りにならないなどの問題が発生する可能性があります。

このような場合は、`ConcurrentHashMap` のようなスレッドセーフなコレクションを使用するか、`collect()` メソッドで安全に結果を収集するようにしましょう。

// 正しい例:ConcurrentHashMapを使用
List numbers = Arrays.asList(1, 2, 3, 4, 5);
ConcurrentHashMap resultMap = new ConcurrentHashMap<>();

numbers.parallelStream().forEach(n -> {
resultMap.put(n, n 2); // ConcurrentHashMapはスレッドセーフ
});

// 必要であれば、後でMapからListに変換など
List resultList = new ArrayList<>(resultMap.values());

5.3. `forEach` の順序は保証されない

`parallelStream()` の `forEach` は、どの要素がどのスレッドで処理されるか、また、処理が終わった順序も保証されません。
そのため、処理の順序が重要な場合は、`forEachOrdered()` を使うか、`collect()` を使って結果を収集する必要があります。

5.4. デフォルトの Fork/Join プール

Parallel Streams は、JVM全体で共有されるデフォルトの Fork/Join プールを使用します。
もし、アプリケーション内で他の処理もこのプールを多用している場合、Parallel Streams のパフォーマンスに影響を与える可能性があります。
場合によっては、独自の Fork/Join プールを作成して利用することも検討しましょう。

まとめ

Parallel Streams は、大量のデータを扱う際に処理速度を向上させる強力なツールです。
しかし、その効果を最大限に引き出し、予期せぬ問題を避けるためには、上記のような注意点を理解しておくことが重要です。

  • データ量や処理内容を考慮して、逐次処理か並列処理かを選択しましょう。
  • スレッドセーフティに常に注意し、安全なコレクションや処理方法を選びましょう。
  • 順序が重要な場合は、`forEachOrdered()` や `collect()` を利用しましょう。

ぜひ、皆さんの開発でも Parallel Streams を活用してみてください!

コメント

タイトルとURLをコピーしました