춤추는 개발자

[Java] Stream API의 기본 메서드 - 2 본문

Developer's_til/그외 개발 공부

[Java] Stream API의 기본 메서드 - 2

Heon_9u 2021. 7. 30. 16:13
728x90
반응형

✅ Stream의 중첩 구조 제거하기

[ FlatMap이란? ]

 간혹, 이중 배열 또는 이중 리스트로 되어 있는 데이터를 1차원으로 처리해야 하는 경우가 발생합니다. 이럴 때 중첩 구조를 한 단계 제거하기 위한 중간 연산으로 flatMap을 사용합니다. flatMap은 Function 함수형 인터페이스를 매개 변수로 받고 있는데, 이 매개 변수는 Stream을 extends하여 구현한 객체여야 합니다.

 예를 들어, 아래와 같이 요소가 문자열 배열(String[])인 Stream이 있다고 가정하겠습니다.

Stream<String[]> strArrstream = Stream.of(
	new String[] {"abc", "def", "ghi"},
	new String[] {"ABC", "DEF", "GHI"}
);

 

 각 요소의 문자열들을 합쳐서 문자열이 요소인 Stream, 즉 Stream<String>으로 만들어 보겠습니다. 먼저, Stream의 요소를 반환하기 위해 map()을 쓰고, 배열을 Stream으로 만들어주는 Arrays.stream(T[])을 쓰겠습니다.

Stream<Stream<String>> strStrStream = strArrStream.map(Arrays::stream);

 

 예상과는 달리, Stream의 Stream이 반환됩니다. 각 요소의 문자열들이 합쳐지지 않고, Stream의 Stream 형태로 되버린 것입니다. 이 때, 간단히 map()을 flatMap()으로 바꾸기만 하면 원하는 결과를 얻을 수 있습니다.

Stream<String> strStream = strArrStream.flatMap(Arrays::stream);

 

[ FlatMap의 동작 방식 ]

 위 처럼 Stream의 중첩 구조를 제거하는 flatMap의 동작 방식을 살펴보겠습니다.

예를 들어, ["Hello", "World"]를 갖는 리스트가 존재할 때, 1개의 알파벳으로 이루어진 String으로 나누고, 중복된 알파벳을 제거하는 List로 변환해보겠습니다. 먼저, Map을 적용한 경우의 동작 방식을 살펴보겠습니다.

 

1. 만약 Map을 적용할 경우

 

  • split("")에 의해 Hello -> ["H", "e", "l", "l", "o"]로 분리되고, World -> ["W", "o", "r", "l", "d"]로 분리됩니다.
  • map에 의해 Stream의 소스가 ["H", "e", "l", "l", "o"], ["W", "o", "r", "l", "d"]로 변환됩니다.
  • distinct()에 의해 중복된 요소가 제거되지만, 해당하는 것은 없습니다.
  • 결과적으로 ["H", "e", "l", "l", "o"], ["W", "o", "r", "l", "d"]가 collect(toList())에 의해 수집됩니다.

4가지 과정을 거쳐서 2개의 String[]을 요소로 갖는 List<String[]>가 생성되었지만, 원하는 결과는 Stream<String>입니다. 다음에는 flatMap을 적용한 경우의 동작 과정을 살펴보겠습니다.

 

2. flatMap을 적용할 경우

 

  • split("")에 의해 Hello -> ["H", "e", "l", "l", "o"]로 분리되고, World -> ["W", "o", "r", "l", "d"]로 분리됩니다.
  • Arrays.stream(T[] array)를 사용하여 ["H", "e", "l", "l", "o"], ["W", "o", "r", "l", "d"]를 각각 Stream<String>으로 만듭니다.
  • flatMap()을 사용해 여러 개의 Stream<String>을 1개의 Stream<String>으로 합치고, Stream의 요소를 ["H", "e", "l", "l", "o", "W", "o", "r", "l", "d"]로 만듭니다.
  • distinct()에 의해 중복된 요소(l, o)를 제거합니다.
  • 결과적으로, ["H", "e", "l", "o", "W", "r", "d"]라는 문자열 리스트가 collect(toList())에 의해 수집됩니다.

[ FlatMap의 또 다른 예제 ]

 만약, 아래와 같은 Student 클래스가 있다고 가정하겠습니다. 객체들이 저장된 students 리스트에서 각 학생들의 모든 과목에 대한 평균 점수를 구해야할 때, flatMap을 이용하면 쉽게 해결할 수 있습니다.

 이 예제에서 students 리스트는 Student보다 한 차원 높게 있으므로, 모든 점수들이 동일한 레벨에 있는 Stream을 생성해야 합니다. 해당 예제에서는 모든 점수들의 자료형이 int이므로 flatMapToInt를 활용할 수 있습니다.

 

class Student {
    private int kor, eng, math;

    public Student(int kor, int eng, int math) {
        this.kor = kor;
        this.eng = eng;
        this.math = math;
    }
}

List<Student> students = Arrays.asList(
    new Student(80, 90, 75),
    new Student(70, 100, 75),
    new Student(85, 90, 85),
    new Student(80, 100, 90)
);

students.stream()
    .flatMapToInt(student -> IntStream.of(Student.getKor(), student.getEng(), student.getMath()))
    .average()
    .ifPresent(avg -> System.out.println(Math.round(avg*10 / 10.0));

 

✅ Null-Safe한 Stream 생성하기

 Java로 개발하다보면 NPE(NullPointerException)를 방지하기 위해 null 여부를 검사하는 코드를 작성하지만, 상당히 가독성이 떨어지는 코드가 작성됩니다.

 이러한 문제를 해결하기 위해 Optional이라는 Wrapper 클래스를 제공하여 Null관련 코드를 가독성있게 처리하고, Stream API 역시 Optional을 통해 Null-safe한 Stream을 생성할 수 있게 되었습니다.

public<T> Stream<T> collectionToStream(Collection<T> collection) {
    return Optional
        .ofNullable(collection)
        .map(Collection::Stream)
        .orElseGet(Stream::empty);
}

 

 collectionToStream 함수는 매개변수로 받은 Collection객체를 Optional로 변형해서 이를 Stream으로 생성한 후, 반환하고 있습니다. 만약 매개변수로 받은 Collection이 null이라면 비어있는 Stream을 반환하기 때문에 NPE가 발생하지 않습니다.

 Optional은 코드의 가독성을 높여주지만 단지, Wrapper 클래스를 사용하는 것입니다. Stream을 생성해야 하는 대상이 Null이 발생할 확률이 높을 경우, 위와 같은 코드를 적용해주는 것이 의미있는 코드입니다.

 

✅ 실행 순서에 대한 고려

 Stream API를 실행 순서를 고려하며 정확히 사용한다면 처리 속도를 개선할 수 있습니다. 특히, Stream의 각 요소들에 대해 연산이 수직적 구조로 선회한다는 것을 명심해야 합니다. 예를 통해, 연산 순서를 확인해보겠습니다.

Stream.of("a", "b", "c", "d", "e")
    .filter(s -> {
        System.out.println("filter: "+s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: "+s));
    
/*
filter: a
forEach: a
filter: b
forEach: b
.
.
.
*/

 

 예상으로는 모든 Stream요소들이 filter를 거친 후에, forEach를 수행한다고 생각합니다. 하지만, 각 요소마다 filter와 forEach를 수행한 후, 다음 요소로 넘어간다는 것을 위 코드를 통해 확인할 수 있습니다.

 

 이런식으로 Stream API의 실행 순서를 고려했을 때, 처리 속도를 개선할 수 있는 방법은 다음 예제를 통해 확인할 수 있습니다. 먼저, 실행 순서를 고려하지 않은 코드입니다.

Stream.of("a", "b", "c", "d")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

/*
map: a
filter: A
forEach: A
map: b
filter: B
map: c
filter: C
map: d
filter: D
*/

 

  map과 filter는 총 4번, forEach는 1번 수행되어 총 9번 수행됐습니다. 하지만, 해당 코드에서 map보다 filter를 먼저 쓴다면 어떻게 될까요?

Stream.of("a", "b", "c", "d")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

/*
filter: a
map: a
forEach: A
filter: b
filter: c
filter: d
*/

 

 위 코드는 filter가 4번, map과 forEach가 1번씩 수행되어 총 5번 수행되었습니다. 동일한 입력/결과임에도 불구하고, Stream API의 샐행 순서에 의해 시간을 단축시켰습니다. 만약, 처리할 데이터가 무수히 많아진다면 성능적인 차이를 야기할 것입니다. 그래서 Stream API를 사용할 때에는 반드시 실행 순서를 고려하며 코드를 작성해야 합니다.

 

✅ 병렬 스트림(Parallel Stream)이란?

 간혹, Stream으로 아주 많은 양의 데이터를 처리해야 하는 경우가 발생합니다. 이럴 때를 대비해서 런타임 성능을 높이기 위해 병렬로 실행할 수 있는 기능인 병렬 스트림을 제공하고 있습니다. Parallel Stream은 내부적으로 fork & join을 사용하며, ForkJoinPool.commonPool()을 통해 사용가능한 공통의 ForkJoinPool의 갯수를 확인할 수 있습니다. 참고로 ThreadPool의 최대 갯수는 5개이며, 사용가능한 물리적인 CPU 코어의 수에 따라 다르게 설정되어 있습니다.

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());

// JVM의 매개변수를 통한 별도 설정
-Djava.util.concurrent.ForkJoinPool.common.parallelism = 5

 

 Collection은 원소들의 Parallel Stream을 생성하기 위한 연산으로 parallelStream() 메서드를 제공하고 있습니다. 또한, 순차 Stream으로 진행하는 중간에 일부 연산만 병렬로 처리하기 위해 중간 연산 메서드로 parallel()을 제공합니다. 다음 예제에서는 Parallel Stream의 실행 동작을 이해하기 위해 특정 로직을 처리한 쓰레드의 정보를 출력하고 있습니다.

Arrays.asList("a", "b", "c", "d")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s[%s]\n", s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s[%s]\n", s, Thread.currentThread().getName()));
    
    
/* 
filter: c [main] 
filter: a [ForkJoinPool.commonPool-worker-2] 
map: a [ForkJoinPool.commonPool-worker-2] 
filter: b [ForkJoinPool.commonPool-worker-1] 
forEach: A [ForkJoinPool.commonPool-worker-2]
map: c [main]
filter: d [ForkJoinPool.commonPool-worker-2]
map: d [ForkJoinPool.commonPool-worker-2] 
map: b [ForkJoinPool.commonPool-worker-1] 
forEach: D [ForkJoinPool.commonPool-worker-2]
forEach: C [main] 
forEach: B [ForkJoinPool.commonPool-worker-1] 
*/

 

 출력 결과를 보면 Stream의 요소별 연산마다 어떤 쓰레드가 수행하였는지 알 수 있습니다. 또한, ForkJoinPool에서 사용가능한 모든 쓰레드를 활용하고 있기 때문에 출력 결과의 순서는 실행할 때마다 바뀔 것입니다.

 

 반면에, Parallel Stream에서 sorted 메서드는 메인 스레드에서 순차적으로 실행됩니다. 이는 내부적으로 새로운 메서드인 Arrays.parallelSort()를 사용하기 때문입니다.

 Parallel Stream에서 정렬 메서드를 할 경우, 특정한 임계값(Threshold)을 계산하여 배열의 길이가 임계값보다 크면 parallelSort()가 사용됩니다. 반대로 작은 경우에는 순차적인 정렬 방식인 Arrays.sort()를 사용하게 됩니다.

 

✅ References

Java의 정석

https://mangkyu.tistory.com/115

 

728x90
반응형