[Java in Action] 병렬 데이터 처리(Parallel Stream)와 성능

728x90
반응형
SMALL

병렬 스트림

Java 7 이전에는 데이터 컬렉션을 병렬로 처리하기 어려웠다.

  • 데이터를 서브 파트로 분할
  • 분할된 서브 파트를 각각의 스레드로 할당
  • 스레드로 할당 후 의도치 않은 레이스 컨디션이 발행하지 않도록 동기화 추가
  • 부분 결과를 다시 결합

하지만, Java 7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 포크/조인 프레임워크 기능을 제공한다.

 

컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다.

 

병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다.

 

따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.

성능 측정

자바 마이크로벤치마크 하니스(Java Microbenchmark Harness - JMH) 라이브러리를 이용해 작은 벤치마크를 구현하여 성능 측정을 한다.

JMH은 어노테이션 기반으로 간단 하고, 안정적으로 벤치마크를 구현할 수 있다.

 

잘못된 병렬 프로그래밍

 
// N개의 숫자를 더하는 함수의 성능 측정
// Stream => 97msecs
public static long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
    	.limit(n)
		.reduce(0L, Long::sum);
}
 
 
// For Loop => 2msecs
public static long iterativeSum(ling n) {
    long result = 0;
    for (long i = 1L; i <= n; i++) {
        result += i;
    }
}
 
 
// Parallel Stream => 164msecs
public static long parallelSum(long n) {
    return Stream.iterate(1L, i -> i+1)
    	.limit(n)
        .parallel()
        .reduce(0L, Long::sum);
}

위 결과에서 parallelStream이 Stream보다 느린 이유

  • iterate 반복 결과로 박싱된 객체를 생성하므로 다시 언박싱 하는 오버헤드 발생
  • iterate은 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 독립적인 청크로 분할하기 어렵다.
    • 이와 같은 상황에서는 리듀싱 연산이 수행되지 않는다.
    • 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 병렬 처리를 위한 청크로 분할할 수 없다.
    • 즉, 병렬 처리를 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과 크게 다른 점이 없어 스레드 할당 오버헤드만 증가했다.

해결책

  • LongStream.rangeClosed()와 같은 특화된 메서드 사용이 필요하다.
  • 특화되지 않은 메서드를 사용하면 오토 박싱, 언박싱 등의 오버헤드가 발생하기 때문이다.
  • 그렇기에, 상황에 따라서는 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 중요하다.

 

올바른 사용

  • 병렬 스트림과 병렬 계산 시 공유된 가변 상태를 피해야 한다.
  • 올바른 자료구조를 선택해야 최적의 성능을 발휘할 수 있다.
  • limit, findFirst처럼 요소의 순서에 의존하는 연산은 병렬 스트림에서 성능이 떨어진다.
  • 소량의 데이터는 병렬 스트림에 도움되지 않는다.

포크 조인 프레임워크

병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브 태스크 각각의 결과를 합쳐서 전체 결과를 만든다.

  • 각각의 서브태스크의 크기가 충분히 작아질 때까지 재귀적으로 분할(Fork)한다.
  • 모든 서브태스크를 병렬로 처리한다.
  • 서브 태스크 수행 결과를 조합(Join)한다.

For Join Framework

 

Recursive Task 활용

스레드 풀(ForkJoinPool)을 활용하려면 RecursiveTask<R>의 서브클래스를 만들어야 한다.

 

RecursiveTask를 정의하려면 추상 메서드 compute()를 구현해야 한다.

 
protected abstract R compute()
 
 
// compute 메서드 구현은 다음과 같은 의사코드 형식을 유지
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
    순차적으로 태스크 계산
} else {
    // 처리 대상 데이터의 절반을 분할하여 left 서브태스크 생성
    ForkJoinTask leftTask = new ForkJoinTask(처리 대상 데이터의 절반);
 
 
    // ForkJoinPool의 다른 스레드로 새로 생성한 left 서브태스크를 비동기로 실행
    leftTask.fork();
 
 
    // left 서브태스크 후 남은 데이터로 right 서브태스크 생성
    ForkJoinTask rightTask = new ForkJoinTask(나머지 데이터);
 
 
    // 두 번째 서브태스크 rightTask를 동기 실행 (실제 실행될 때 compute 다시 호출, 재귀호출과 유사. 필요시 재분할)
    T rightResult = rightTask.compute();
 
 
    // 비동기로 실행된 첫 번째 서브태스크 leftTask의 결과를 읽거나 아직 결과가 없으면 기다린다.
    T leftResult = leftTask.join();
 
 
    // 두 서브태스크 leftTask와 rightTask의 결과를 조합한 값이 최종 결과값이다.
    return leftResult + rightResilt;
}

Fork Join 알고리즘

작업 훔치기

  • 포크 조인 프레임워크는 작업 훔치기 기법을 통해 모든 스레드를 거의 공정하게 분할한다.
  • ForkJoinPool 내부 Inbound Queue를 통해 Task가 누적되어 들어온다.
  • 쓰레드 A, B는 Inbound Queue에서 Task를 가져가 처리한다.
  • 쓰레드 A, B는 각자 Queue를 가지고 있으며, 자신의 Queue에 Task가 없으면 다른 쓰레드의 Queue에서 작업을 훔쳐온다.
  • 최대한 노는 쓰레드가 없게 하기 위한 알고리즘이다.

 

정리

병렬 스트림, 포크 조인 프레임워크가 무엇인지는 어느 정도? 이해했으나, 실무에서 사용하는 경우가 있을까?

 

아직 실무에서 본 적이 없다.

 

데이터가 대용량인 경우(배치, 대용량 처리)에 충분한 검토 후 사용이 가능할 것으로도 보이는데, 대용량의 기준도 상황, 환경에 따라 다르니...

 

 

728x90
반응형
LIST