배울 내용
- 병렬 스트림으로 데이터를 병렬 처리
- 병렬 스트림의 성능 분석
- 포크/조인 프레임워크
- Spliterator로 스트림 데이터 쪼개기
배웠던 내용
- 새로운 스트림 인터페이스를 이용해서 데이터 컬렉션을 선언형으로 제어하는 방법에 대해 배움
- 외부 반복 -> 내부 반복으로 바꾸면 네이티브 자바 라이브러리가 스트림 요소의 처리를 제어할 수 있음
- 그래서 자바 개발자들은 컬렉션 데이터 처리 속도를 높이려고 따로 고민할 필요 X
- 컴퓨터의 멀티 코어를 활용해서 파이프라인 연산을 실행할 수 있음
자바 7 에서는 컬렉션을 병렬로 처리하기 굉장히 까다로웠습니다.
간단한 병렬 처리 순서
- 데이터를 서브파트로 나누어서 각각의 스레드로 할당
- 자원에 대한 레이스 컨디션(race condition)이 발생하지 않도록 동기화 처리
- 마지막으로 각 스레드에서 반환된 결과를 합쳐야 함
그래서 자바 7에서도 위의 병렬 처리를 좀 더 쉽게 하기 위해 포크/조인 프레임워크를 제공합니다.
앞으로 배울 내용에 대해서 좀 더 자세하게
이번 장에서는 자바 스트림을 이용해서 얼마나 쉽게 데이터를 병렬 처리 할 수 있는지 살펴보려고 합니다.
그리고 아무리 스트림을 이용해서 병렬 처리가 간편해 졌다고 해서 개발자가 스트림 병렬 처리를 잘못 사용하는 경우가 발생할 수 있습니다. 이런 문제가 발생하지 않도록 내부적으로 병렬 처리가 어떤 식으로 처리되는지도 알아볼 예정입니다.
7.1 절에서는 여러 청크를 병렬로 처리하기 전에 병렬 스트림이 요소를 여러 청크로 분할하는 방법을 알아볼 것입니다.
이 원리를 이해하면 스트림의 병렬 처리 방법에 대해서 이해하게 되고, 스트림을 잘못 사용해서 발생할 수 있는 문제 상황을 피할 수 있게 될것입니다.
7.1 병렬 스트림
스트림이 병렬로 데이터를 처리하는 방법
- 컬렉션의 parallelStream 호출 -> 병렬 스트림 생성!
- 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
- 병렬 스트림을 이용해서 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당 가능
1 - N 까지의 수를 합하는 함수를 병렬 스트림으로 처리해보기
스트림 사용 코드 - 함수형 프로그래밍 코드
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
전통적인 자바 코드 - 명령형 프로그래밍 코드
public long iterativeSum(Long n) {
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
n이 굉장히 커진다면 병렬로 처리하는 것이 좀 더 효율적으로 보입니다.
병렬 스트림을 이용해서 높은 수준의 추상화를 경험해 봅시다.
7.1.1 순차 스트림을 병렬 스트림으로 변환하기
스트림의 parallel() 메서드를 이용해서 병렬 스트림으로 쉽게 변환 가능합니다.
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
이 코드와 다른 코드의 차이점은 이 코드는 스트림이 여러 청크로 분리되어 처리된다는 점입니다.
이걸 사용하는 순간 개발자는 병렬 처리에 대한 투명성을 얻게 되어 쉽게 병렬 처리 가능합니다.
내부적으로는 parallel을 호출하는 순간 연산이 병렬로 수행해야 함을 의미하는 불리언 플래그가 설정됩니다.
그 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있습니다.
parallel, sequential은 파이프라인의 처리 방법을 나타냅니다(병렬 or 순차).
7.1.2 스트림 성능 측정
그런데 과연 병렬 처리가 순차 처리보다 무조건적으로 더 빠를까요?
소프트웨어 공학에서 추측은 위험한 방법이라고 합니다...
성능 최적화 시 중요한 건 추측이 아닌 측정!
이번엔 JMH(Java Microbenchmark Harness)를 상요해서 병렬 vs 순차 처리 성능을 비교해 보려고 합니다.
여기서는 JMH 라이브러리 설정 및 사용법을 자세하게 다루진 않겠습니다.
제가 따로 포스팅 해두었으니 참고해주시면 좋을것 같습니다!
https://agh-dev.tistory.com/22
JMH(Java Microbenchmark Harness) - Gradle을 통해 Benchmark 구현
이번 포스팅에서는 Gradle 빌드 도구를 사용해서 JMH 라이브러리를 다운받고 빌드해서 Benchmark를 구현해보려고 합니다. 환경 IDE: IntelliJ 2023.2 Gradle: gradle-8.2 plugin: id "me.champeau.jmh" version "0.7.1" Gradle
agh-dev.tistory.com
결과를 바로 보겠습니다.
벤치마크 코드
package parallel;
import org.openjdk.jmh.annotations.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime) //벤치마크 대상 함수 실행시 걸리는 시간의 평균 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) //벤치마크 결과를 밀리초 단위로 출력
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"}) //4Gb의 힙 공간을 제공한 환경에서 두번 벤치마크를 수행해 결과의 신뢰성 확보!
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).reduce(0L, Long::sum);
}
@Benchmark
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
@Benchmark
public long parallelSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).parallel().reduce(0L, Long::sum);
}
@TearDown(Level.Invocation) // 벤치마크가 gc 의 영향을 받지 않도록 하기 위해 벤치마크 끝날 때 마다 gc 실행!
public void tearDown() {
System.gc();
}
}
벤치마크 실행 결과

결과는 전통적인 for문을 사용한 함수가 가장 빠르다는 결과가 나왔습니다.
순차 스트림,병렬 스트림 결과가 더 느린 이유
- 반복 결과로 박싱된 객체가 만들어지는데 숫자를 더하기 위해 언방식도 해야함
- 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기 힘듦
특히 두번째 문제는 스트림을 병렬로 처리해야 하는 입장에서는 지나칠 수 없는 문제입니다.
병렬로 사용할 수 있는 스트림을 사용해야 하는 것이 중요해 보입니다.
그런 의미에서 지금 사용한 Stream.iterate는 병렬로 처리하기 애매한 스트림 입니다...
Stream.iterate는 최종연산이 실행될 때 데이터를 생성합니다.
리듀싱 과정을 실행해야 하는데 데이터가 준비되어 있지 않으므로 스트림의 요소를 청크로 나누어 병렬처리 할 수가 없게되는 것 입니다...
위의 과정을 이해하는 것이 parallel 메서드를 호출했을 때 내부적으로 어떤 일이 일어날지 이해하는데 도움이 될거라고 생각합니다!
병렬 스트림에서 사용하는 스레드 풀 설정
더 특화된 메서드 사용
LongStream.rangeClosed라는 메서드를 사용해서 병렬 처리를 진행해 봅시다.
LongStream.rangeClosed의 장점
- LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라짐
- LongStream.rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산. - ex) 1-20이라면 1-5,6-10,11-15,16-20 이런 식으로 범위의 숫자로 분할할 수 있음
우선 병렬 처리를 위한 parallel 메서드 추가 전에 박싱, 언방식 오버헤드를 한번 살펴보겠습니다.
@Benchmark
public long rangedSum() {
return LongStream.rangeClosed(1, N)
.reduce(0L, Long::sum);
}
결과

iterate 팩토리 메서드를 사용해서 박싱, 언방식 과정을 거치는 경우에 비해 훨씬 더 좋아진 수행결과를 확인할 수 있습니다.
그래서 상황에 따라서는 알맞은 알고리즘과 자료구조를 적절히 선택할 수 있는 능력도 좋은 프로그래머가 되기 위해 필요하다고 생각됩니다.
이제 병렬 처리도 한번 살펴보겠습니다.
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}
결과

최종

꼭! 알고 넘어가야 하는 부분
이제 순차 실행보다 더 빠른 결과를 얻어낼 수 있는 병렬 리듀싱을 만들었습니다.
앞서 제가 올바른 알고리즘, 자료구조 선택이 중요하다고 했었는데, 상황에 알맞아 보이는 LongStream.rangeClosed를 사용해서 올바른 자료구조 선택에 성공했고, 결과도 성공적이었습니다!
하지만 병렬 처리가 모든 문제의 해결책은 아닐겁니다.
병렬 처리를 한다는 것은 스트림을 재귀적으로 분할하고, 분할된 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나로 합쳐야 한다는 과정을 거친다는 것을 꼭 기억합시다.
저 과정에서 발생하는 오버헤드도 무시할 수 없으니까요...
항상 저런 과정이 있다는 것을 머리속에 넣어두고, 내가 짠 병렬 스트림 코드가 올바르게 사용되었는지 돌아볼 필요가 있습니다.
7.1.3 병렬 스트림의 올바른 사용법
병렬 실행중 정말 큰 문제는 스레드간에 공유되는 데이터를 변경하는 것입니다.
다음 코드는 n까지의 자연수를 더하기 위해 병렬 처리를 구현할 코드입니다.
public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
public static class Accumulator {
private long total = 0;
public void add(long value) {
total += value;
}
}
병렬처리 관점에서 코드의 문제점
- total이라는 여러 스레드가 공유하는 변수가 지속적으로 상태가 변함
- 특히 total에 여러 스레드가 접근할 때 데이터 레이스 문제 발생
- 해당 코드는 본질적으로 순차 처리에 더 적합한 코드라는 것
- total += value 는 아토믹 하지 않은 연산
코드 실행 및 결과
public static <T, R> long measurePerf(Function<T, R> f, T input) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
R result = f.apply(input);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result: " + result);
if (duration < fastest) {
fastest = duration;
}
}
return fastest;
}
public static void main(String[] args) {
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum,10_000_000L) + "msecs");
}

제대로된 결과조차 나오고 있지 않습니다.
이 문제의 근본적인 문제는 total(누적자)이라는 변수는 여러 스레드가 접근할 수 있음에도 불구하고 상태가 지속적으로 변화한다는 것입니다.
-> 상태 공유에 따른 부작용!
이번에는 병렬 처리에서는 공유된 가변 상태를 피해야함을 코드 실행을 통해 직접 확인해 보았습니다.
병렬 스트림이 올바르게 작동하기 위해서는 공유된 가변 상태를 피해야 함을 꼭 기억합시다!
이제부턴 어떤 상황에서 병렬 스트림을 사용해야 적절한 성능을 개선할 수 있는지 한번 알아봅시다.
7.1.4 병렬 스트림 효과적으로 사용하기
- 확신이 서지 않으면 측정! - 사실 스트림을 병렬처리하는 과정이 투명하지 않을 때가 많음. 앞서 코드 결과를 보면 스트림을 병렬로 처리하는 것의 목적은 더 빠른 데이터 처리인데 상황에 따라 그렇지 않은 경우가 있음. 그렇기 때문에 확신이 서지 않는다면 항상 벤치마킹을 통해 성능을 측정하는 것이 바람직!
- 박싱 주의 - 자동 박싱/언박싱은 성능을 크게 저하시킬 수 있는 요소. - Java는 이 문제를 위해 기본형 특화 스트림을 제공(IntStream,LongStream,DoubleStream) - 되도록이면 기본형 특화 스트림을 사용하자!
- 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 존재 - 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하는 것은 비용이 비쌈 - findFirst, limit 같은 것들 특히 조심 - findAny와 같이 요소의 순서에 상관없는 연산이 병렬 스트림 처리에서 성능이 좋음 - 정렬된 스트림에 unordered를 호출하면 비정렬된 스트림을 얻을 수 있어 순서에 의존하지 않도록 만들 수 있음
- 스트림에서 수행되는 파이프라인 연산 비용 고려 - 스트림 전체 요소 수를 N, 스트림 요소 하나를 처리하는 데 드는 비용을 Q라고 하면, 스트림 파이프라인 처리 비용은 N * Q라고 할 수 있음. - Q가 높아진다는 것은 병렬 스트림으로 성능 개선이 가능하다는 것
- 소량의 데이터에서는 병렬 스트림 피하기 - 소량의 데이터를 처리하기 위해 병렬 스트림을 사용하는 것은 부가 비용을 상쇄할 만큼의 이득을 보기 힘듬
- 스트림을 구성하는 자료구조가 적절한지 확인 - LinkedList보다 ArrayList가 좀 더 적절한 자료구조가 될 수 있음. - 분할할 때 LinkedList는 자료구조 특성상 모든 요소를 확인해야 하기 때문에 그렇지 않은 ArrayList가 더 적절 - range 팩토리 메서드로 기본형 스트림을 만드는 것이 iterate를 사용하는 것보다 더 적절한 것도 하나의 예시가 될 수 있음
- 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 성능이 좌우될 수 있음 - SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적인 병렬 스트림 가능 - filter가 있는 스트림의 경우 최종연산 시에도 정확한 스트림의 길이를 예측할 수 없어 효과적인 병렬 스트림이 가능한지 알 수 없음
- 최종 연산의 병합과정(Collector의 combiner 메서드 같은 것들) 비용 살펴보기 - 병합 과정이 비싸다면 병렬 스트림으로 얻은 성능이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있음
| 소스 | 분해성 |
| ArrayList | 훌륭함 |
| LinkedList | 나쁨 |
| IntStream.range | 훌륭함 |
| Stream.iterate | 나쁨 |
| HashSet | 좋음 |
| TreeSet | 좋음 |
이제 병렬 스트림이 내부적으로 어떻게 처리되는지 알아봅시다.
7.2 포크/조인 프레임워크
포크/조인 프레임워크는 하나의 작업을 재귀적으로 작은 작업으로 분할하고 서브태스크 각각의 결과를 합쳐서 전체의 결과를 만들도록 설계되었습니다.
해당 프레임워크는 서브태스크를 스레드 풀의 작업자 스레드에 분산 할당하는 ExcutorService 인터페이스를 구현합니다.
7.2.1 RecursiveTask 활용
스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야 합니다.
여기서 R은 병렬화된 태스크가 반환하는 결과 혹은 없는 경우에는 RecursiveAction형식 입니다.
서브클래스를 정의하기 위해서는 추상메서드인 compute 메서드를 구현해야 합니다.
compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없는 경우 개별 서브태스크의 결과를 생산할 알고리즘을 정의합니다.
서브클래스를 만들어 compute 메서드를 구현해봅시다.
n까지의 자연수 덧셈 작업을 병렬로 처리하기 위해 작성한 ForkJoinSumCalculator 클래스
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000; //이 값 이하로 서브태스크 분할 X
public ForkJoinSumCalculator(long[] numbers) { //메인 태스크 생성할 public 생성자
this(numbers,0,numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) { //메인 태스크의 서브태스크를 재귀적으로 만들 때 사용할 private 생성자
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); //왼쪽 절반 부분을 수행할 작업 생성
leftTask.fork(); //ForkJoinPool 의 다른 스레드로 새로 생성한 태스크를 비동기 실행
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); //나머지 절반 부분을 수행할 작업 생성
Long rightResult = rightTask.compute(); //이 태스크는 동기 실행
Long leftResult = leftTask.join(); //첫 번째 서브태스크 결과를 읽거나 결과가 없으면 기다림
return leftResult + rightResult; //두 서브태스크의 결과를 조합한 값이 해당 태스크의 결과
}
//더 분해할 수 없는 경우 서브태스크의 결과를 계산할 알고리즘
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
public static void main(String[] args) {
long result = forkJoinSum(10_000_000);
System.out.println(result);
}
}
- forkJoinSum
- LongStream으로 n까지의 자연수를 포함하는 배열 생성
- 배열을 ForkJoinSumCalculator에 전달하고 ForkJoinTask 생성
- 해당 태스크를 ForkJoinPool의 invoke 메서드로 전달하고 반환값은 ForkJoinSumCalculator에서 정의한 태스크의 결과가 됨
결과

병렬 스트림 처리보다 좀 느린 결과가 나왔습니다.
이는 ForkJoinSumCalculator 태스크에서 사용할 수 있도록 전체 스트림을 long[] 으로 변환했기 때문입니다.
7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법
포크/조인 프레임워크 제대로 사용하는 법
- join 메서드는 모든 서브태스크를 시작한 후 호출 - join 메서드를 호출하는 순간 그 태스크가 결과를 반환하기까지 기다리게 됨 - 이렇게 되면 거의 순차적으로 처리하는 형태와 다를바가 없고 그렇게 되면 더 느리고 더 복잡한 프로그램이 되어버림
- RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드 사용 X - 그 대신 순차 코드(forkJoinSum) 에서는 invoke 사용 가능 - compute, fork 메서드는 사용 가능
- 서브태스크에서 fork 메서드를 호출해서 ForkJoinPool의 일정 조절 가능 - 왼쪽 작업은 fork를 호출하고 오른쪽 작업은 compute(fork 말고)를 호출하는 것이 더 효율적 - 그렇게 되면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있어 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있음
- 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어려움 - 보통 IDE를 사용하면 stack trace로 문제가 일어난 과정을 살펴볼 수가 있는데, 포크/조인 프레임워크는 fork라 불리는 다른 스레드에서 compute를 호출하므로 stack trace가 의미가 없다.
- 포크/조인 프레임워크가 무조건적으로 투명성을 보장한다는 생각을 버려야 함 - 앞서 결과를 확인했듯이 순차 스트림보다 포크/조인 프레임워크를 사용한 것이 더 낮은 성능을 보여줬음 - 병렬 처리로 성능을 개선하기 위해서는 태스크를 여러 독립적인 서브태스크로 분할 가능해야 함 - 이 과정에서 생각보다 많은 오버헤드 발생 그렇기에 각 서브태스크의 실행 시간이 이런 오버헤드가 발생하는 시간보다는 더 길어야 병렬 처리가 의미가 있음 - JIT 컴파일러에 의해 최적화 되기 위해 warm-up이 필요하다는 것을 기억하자. - 그리고 컴파일러 최적화는 사실 병렬 처리보다는 순차 처리에 좀 더 잘 최적화 되어 있음을 알아두자.
이제 다음 절에서는 포크/조인 분할 전략에 대해 알아보겠습니다.
7.2.3 작업 훔치기
ForkJoinSumCalculator 예제에서는 덧셈을 수행할 숫자가 만 개 이하면 서브태스크 생성을 중단했습니다.
만약 천만개의 숫자를 넣어주면 서브태스크는 1000개 이상이 될 것입니다.
그런데 CPU 코어가 4개 뿐이라면, 16,32개 이더라도 천 개 이상의 서브태스크는 자원 낭비라고 생각이 들 수 있습니다.
만약 CPU 코어만큼 서브태스크를 만들고, 각각의 서브태스크가 같은 시간에 종료가 된다라고 한다면 그럴 수도 있을 겁니다.
하지만 실제로는 코어 개수와 상관없이 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직합니다.
사실 이론적으로 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 CPU 코어에서 태스크를 실행할 것이고, 크기가 같은 각각의 태스크는 같은 시간에 종료될 것이라고 생각할 수 있습니다.
그렇지만 위와 같은 자연수 덧셈 예시와는 달리 좀 더 복잡한 상황에서는 각각의 서브태스크의 작업 완료 시간이 크게 달라질 수 있습니다.
포크/조인 프레임워크는 작업 훔치기(work stealing)이라는 기법으로 해당 문제를 해결합니다.
포크/조인 프레임워크 작업 훔치기(work stealing)
- Job Queue
각 스레드에는 스레드가 현재 실행을 담당하는 하위 작업을 저장하는 자체 작업 대기열이 있습니다.
이 자체 작업 대기열은 이중 연결 리스트로 구성되어 있습니다.
- Work Execution
스레드는 자체 작업 대기열에서 작업을 실행합니다.
스레드가 작업 대기열을 완료하면 다른 스레드의 작업 대기열에서 작업을 찾습니다.
- Work Stealing
스레드의 작업 대기열이 비어 있으면 다른 스레드의 작업 대기열에서 작업을 훔칩니다.
작업 훔치기 스레드는 일반적으로 작업이 가장 많은 다른 스레드의 작업 대기열을 대상으로 하고 대기열 끝에서 하위 작업을 훔칩니다.
이 프로세스는 자체 큐가 비어 있는 경우에도 스레드가 작업 처리에 지속적으로 참여하도록 합니다.
모든 태스크가 작업을 끝낼 때까지, 모든 큐가 빌 때까지 work stealing을 반복합니다.
그렇기 때문에 작업의 크기를 작게 균등하게 나누면 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있는 것입니다.
- Load Balancing
work stealing은 작업을 균등하게 분배하여 스레드 간에 부하 분산을 달성하는 데 도움이 됩니다.
빈 대기열로 인해 유휴 상태인 스레드는 작업을 도용하여 바쁜 스레드를 도울 수 있으며 일부 스레드가 압도당하고 다른
스레드는 유휴 상태로 남아 있는 상황을 피할 수 있습니다.
포크/조인 프레임워크에서 사용하는 작업 훔치기 과정

앞에서 병렬 스트림(parallel 메서드)을 사용할 때 분할 로직을 직접 짜지 않아도 됐었습니다.
그건 Spliterator라는 인터페이스 때문인데 이제 이 인터페이스에 대해 공부해 보겠습니다.
7.3 Spliterator 인터페이스
자바 8은 Spliterator라는 새로운 인터페이스를 제공합니다.Spliterator는 병렬 상황에서 요소를 탐색하는 기능에 특화되어 있습니다. 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공합니다.
java.util의 Collection 인터페이스의 spliterator()

Iterable의 default 메서드인 spliterator


Spliterator
public interface Spliterator<T> { //T는 Spliterator에서 탐색하는 요소의 형식
boolean tryAdvance(Consumer<? super T> action); //Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참 반환
Spliterator<T> trySplit(); //Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성
long estimateSize(); //탐색해야 할 요소 수 정보를 제공
int characteristics();
}
7.3.1 분할 과정
분할 과정
1. 첫 번째 Spliterator에 trySplit을 호출하면 두 번째 Spliterator가 생성
2. 두 개의 Spliterator에 trySplit를 다시 호출하면 네 개의 Spliterator가 생성
3. null이 반환되면 위 해당 Spliterator는 분할이 중단됨
4. Spliterator가 실행한 모든 trySplit에서 null을 반환하면 모든 재귀 분할 과정 중단
해당 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받습니다.
Spliterator 특성
ArrayList에 정의된 Spliterator의 characteristics 구현 메서드

Spliterator 자체의 특성 집합을 포함하는 int 반환
Spliterator의 특성 정보
| 특성 | 의미 |
| DISTINCT | x,y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환 |
| ORDERED | 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의해야 함 |
| SORTED | 탐색된 요소는 미리 정의된 정렬 순서를 따름 |
| SIZED | 크기가 알려진 소스(예를 들면 Set)로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환 |
| NON-NULL | 탐색하는 모든 요소는 null이 아님 |
| IMMUTABLE | 이 Spliterator의 소스는 불변. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제하거나, 고칠 수 없음 |
| CONCURRENT | 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있음 |
| SUBSIZED | 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 가짐 |
7.3.2 커스텀 Spliterator 구현하기
문장에서 단어의 수를 세는 코드를 Spliterator를 구현해서 병렬 스트림 처리 하는 방식으로 짜보려고 합니다.
WordCounter 클래스
public class WordCount {
public static final String SENTENCE =
" Nel mezzo del cammin di nostra vita "
+ "mi ritrovai in una selva oscura"
+ " che la dritta via era smarrita ";
private final int counter;
private final boolean lastSpace;
public WordCount(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCount accumulate(Character c) {
if(Character.isWhitespace(c)){
return lastSpace ? this : new WordCount(counter, true);
}else {
return lastSpace ?
new WordCount(counter + 1, false) :
this;
}
}
public WordCount combine(WordCount wordCount) {
return new WordCount(counter + wordCount.counter, wordCount.lastSpace);
}
}
이렇게 해두면 스트림으로 바로 처리할 수 있습니다.
스트림 처리 코드
public static int countWords(Stream<Character> stream) {
WordCount wordCounter = stream.reduce(new WordCount(0, true),
WordCount::accumulate,
WordCount::combine);
return wordCounter.getCounter();
}
public static void main(String[] args) {
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream) + " words");
}
결과

이제 parallel 메서드를 이용해서 병렬 처리를 시도해 보겠습니다.
public static void main(String[] args) {
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream.parallel()) + " words");
}
결과

결과가 제대로 나오지 않습니다
이유
병렬 스트림 요소는 SENTENCE 입니다. 이 문자열을 임의로 나누다 보니 그 과정에서 원래 하나의 단어를 둘로 계산하는 상황이 발생할 수 있는 것입니다.
즉, 순차 스트림을 병렬 스트림으로 바꾸기 위해 스트림 요소를 분할할 때 그 위치에 따라 잘못된 결과가 나올 수 있는 것입니다.
그래서 지금 필요한 것은 분할 기준을 정할 Spliterator를 만들어 줘야 하는 것입니다!
단어의 끝을 알려주기 위한 Spliterator를 만들어 줍시다.
private static class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++)); //현재 문자를 소비
return currentChar < string.length(); //소비할 문자가 남아 있으면 true 반환
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if(currentSize < 10){
return null; //파싱할 문자열을 순차 처리할 수 있을 만큼 충분히 작아졌음을 알리는 null 반환
}
for (int splitPos = currentSize / 2 + currentChar;
splitPos < string.length(); //파싱할 문자열의 중간을 분할 위치로 설정
splitPos++) {
if(Character.isWhitespace(string.charAt(splitPos))){ //다음 공백이 나올 때까지 분할 위치를 뒤로 이동
Spliterator<Character> spliterator = //처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator 를 생성
new WordCounterSpliterator(string.substring(currentChar,splitPos));
currentChar = splitPos;
return spliterator; //공백을 찾았고 문자열을 분리했으므로 루프를 종료
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}
- tryAdvance - 문자열에서 현재 인덱스에 해당하는 문자를 Consumer에 제공한 다음에 인덱스 증가 - 인수로 전달된 Consumer는 스트림을 탐색하면서 적용해야 하는 함수 집합이 작업을 처리할 수 있도록 소비한 문자를 전달하는 자바 내부 클래스 - WordCounter의 accumulate 메서드가 작용되는 것! - 해당 메서드는 새로운 커서 위치가 문자열의 전체 길이보다 작은 경우를 true로 반환
- trySplit - 분할하기 위한 알고리즘이 정의된 메서드 - 분할되는 태스크가 너무 많지 않도록 분할 기준 설정 - 분할 과정에서 남은 문자 수가 한계값 이하면 null을 반환 - 분할은 파싱해야 할 문자열 청크의 중간 위치를 기준으로 분할 - 단어 중간을 분할하지 않도록 빈 문자가 나올때까지 분할 위치 이동 - 분할할 위치를 찾으면 Spliterator를 만듦
- estimatedSize - estimatedSize는 탐색해야 할 요소의 개수이고 Spliterator는 파싱할 문자열 전체 길와 현재 반복 중인 위치의(currentChar)의 차다.
- characteristics - ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE - ORDERED(문자열의 문자 등장 순서가 유의미함), SIZED(estimatedSize 메서드의 반환값이 정확) - SUBSIZED(trySplit으로 생성된 Spliterator도 정확한 크기를 가짐), NONNULL(문자열에는 null 문자가 존재하지 않음) - IMMUTABLE(문자열 자체가 불변 클래스이므로 문자열을 파싱하면서 속성이 추가되지 않음) - 이렇듯 characteristics는 프레임워크에게 해당 Spliterator가 위의 특성을 가진다는 것을 알려주는 기능을 하는 메서드
결과
public static void main(String[] args) {
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);
System.out.println("Found " + countWords(stream) + " words");
}

결과가 이제 제대로 잘 나왔습니다.
정리
- 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리 가능
- 간단하게 스트림을 병렬로 처리할 수 있지만 항상 병렬 처리가 빠른 것은 아님
- 병렬 소프트웨어 동작 방법과 성능은 직관적이지 않을 때가 많아 항상 성능 측정을 해봐야 함
- 병렬 스트림으로 데이터 집합을 병렬 실행할 때 특히 처리해야 할 데이터가 아주 많거나 각 요소를 처리하는 데 오랜 시간이 걸릴 때 성능을 높일 수 있음
- 가능하면 기본형 특화 스트림을 사용하는 등 올바른 자료구조 선택이 어떤 연산을 병렬 처리하는 것보다 성능적으로 더 큰 영향을 미칠 수 있음
- 포크/조인 프레임워크에서는 병렬화할 수 있는 태스크를 작은 태스크로 분할한 다음에 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생상
- Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화(분할 기준 설정) 할 것인지 정의
'도서 > 모던 자바 인 액션' 카테고리의 다른 글
| 모던 자바 인 액션 - 10장 람다를 이용한 도메인 전용 언어 (0) | 2023.08.27 |
|---|---|
| 모던 자바 인 액션 - 6장 - 스트림으로 데이터 수집 (0) | 2023.08.05 |
| 모던 자바 인 액션 - 5장 다양한 스트림 활용 (0) | 2023.08.02 |
| 모던 자바 인 액션 - 4장 스트림 활용 (0) | 2023.07.31 |
| 모던 자바 인 액션 - 2장 동작의 파라미터화 (1) | 2023.07.28 |