병렬 스트림
Java 7 이전에는 데이터 컬렉션을 병렬로 처리하기 어려웠다.
- 데이터를 서브 파트로 분할
- 분할된 서브 파트를 각각의 스레드로 할당
- 스레드로 할당 후 의도치 않은 레이스 컨디션이 발행하지 않도록 동기화 추가
- 부분 결과를 다시 결합
하지만, Java 7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 포크/조인 프레임워크 기능을 제공한다.
컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다.
병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.
따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
성능 측정
자바 마이크로벤치마크 하니스(Java Microbenchmark Harness - JMH) 라이브러리를 이용해 작은 벤치마크를 구현하여 성능 측정을 한다.
JMH은 어노테이션 기반으로 간단 하고, 안정적으로 벤치마크를 구현할 수 있다.
잘못된 병렬 프로그래밍
// N개의 숫자를 더하는 함수의 성능 측정
// Stream => 97msecs
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
// For Loop => 2msecs
public static long iterativeSum(ling n) {
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
}
// Parallel Stream => 164msecs
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i+1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
위 결과에서 parallelStream이 Stream보다 느린 이유
- iterate 반복 결과로 박싱된 객체를 생성하므로 다시 언박싱 하는 오버헤드 발생
- iterate은 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 독립적인 청크로 분할하기 어렵다.
- 이와 같은 상황에서는 리듀싱 연산이 수행되지 않는다.
- 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 병렬 처리를 위한 청크로 분할할 수 없다.
- 즉, 병렬 처리를 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과 크게 다른 점이 없어 스레드 할당 오버헤드만 증가했다.
해결책
- LongStream.rangeClosed()와 같은 특화된 메서드 사용이 필요하다.
- 특화되지 않은 메서드를 사용하면 오토 박싱, 언박싱 등의 오버헤드가 발생하기 때문이다.
- 그렇기에, 상황에 따라서는 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 중요하다.
올바른 사용
- 병렬 스트림과 병렬 계산 시 공유된 가변 상태를 피해야 한다.
- 올바른 자료구조를 선택해야 최적의 성능을 발휘할 수 있다.
- limit, findFirst처럼 요소의 순서에 의존하는 연산은 병렬 스트림에서 성능이 떨어진다.
- 소량의 데이터는 병렬 스트림에 도움되지 않는다.
포크 조인 프레임워크
병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브 태스크 각각의 결과를 합쳐서 전체 결과를 만든다.
- 각각의 서브태스크의 크기가 충분히 작아질 때까지 재귀적으로 분할(Fork)한다.
- 모든 서브태스크를 병렬로 처리한다.
- 서브 태스크 수행 결과를 조합(Join)한다.
Recursive Task 활용
스레드 풀(ForkJoinPool)을 활용하려면 RecursiveTask<R>의 서브클래스를 만들어야 한다.
RecursiveTask를 정의하려면 추상 메서드 compute()를 구현해야 한다.
protected abstract R compute()
// compute 메서드 구현은 다음과 같은 의사코드 형식을 유지
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
// 처리 대상 데이터의 절반을 분할하여 left 서브태스크 생성
ForkJoinTask leftTask = new ForkJoinTask(처리 대상 데이터의 절반);
// ForkJoinPool의 다른 스레드로 새로 생성한 left 서브태스크를 비동기로 실행
leftTask.fork();
// left 서브태스크 후 남은 데이터로 right 서브태스크 생성
ForkJoinTask rightTask = new ForkJoinTask(나머지 데이터);
// 두 번째 서브태스크 rightTask를 동기 실행 (실제 실행될 때 compute 다시 호출, 재귀호출과 유사. 필요시 재분할)
T rightResult = rightTask.compute();
// 비동기로 실행된 첫 번째 서브태스크 leftTask의 결과를 읽거나 아직 결과가 없으면 기다린다.
T leftResult = leftTask.join();
// 두 서브태스크 leftTask와 rightTask의 결과를 조합한 값이 최종 결과값이다.
return leftResult + rightResilt;
}
작업 훔치기
- 포크 조인 프레임워크는 작업 훔치기 기법을 통해 모든 스레드를 거의 공정하게 분할한다.
- ForkJoinPool 내부 Inbound Queue를 통해 Task가 누적되어 들어온다.
- 쓰레드 A, B는 Inbound Queue에서 Task를 가져가 처리한다.
- 쓰레드 A, B는 각자 Queue를 가지고 있으며, 자신의 Queue에 Task가 없으면 다른 쓰레드의 Queue에서 작업을 훔쳐온다.
- 최대한 노는 쓰레드가 없게 하기 위한 알고리즘이다.
정리
병렬 스트림, 포크 조인 프레임워크가 무엇인지는 어느 정도? 이해했으나, 실무에서 사용하는 경우가 있을까?
아직 실무에서 본 적이 없다.
데이터가 대용량인 경우(배치, 대용량 처리)에 충분한 검토 후 사용이 가능할 것으로도 보이는데, 대용량의 기준도 상황, 환경에 따라 다르니...
'Study > Modern Java8 in Action' 카테고리의 다른 글
[Java in Action] Default Method, Static Method 란? (0) | 2021.01.17 |
---|---|
[Java in Action] Collection API (List & Map & Set) (0) | 2021.01.17 |
[Java in Action] Stream 스트림이란 ? (0) | 2021.01.17 |
[Java in Action] 람다, 함수형 인터페이스, 메서드 레퍼런스 (0) | 2021.01.17 |
[Java in Action] 동작 파라미터화 적용하여 리팩토링 (0) | 2020.03.17 |