Java8 in Action 정리
4. 스트림 소개
스트림 API
- 데이터 컬렉션 반복을 선언형으로 처리하고 조립하는 기능
- 멀티 스레드 코드를 구현하지 않고도 데이터 처리과정을 병렬화하여 스레드와 락 걱정없이 처리 가능
- 스트림 API는 매우 비싼 연산이다.
스트림
- 데이터 처리 연산을 지원하도록 소스에서 추출된 연속된 요소
스트림과 컬렉션 차이
- 데이터를 언제 계산하는가
- 스트림 : 요청할 때만 요소를 계산하는 고정된 자료구조 (스트림에 요소를 추가/제거 불가)
- 컬렉션 : 현재 자료구조가 포함하는 모든 값을 메모리에 저장
- 딱 한번만 탐색할 수 있다.
- Awways.asList(“1”,”2”).stream().forEach(System.out::println).forEach(System.out::println)
- 첫번째 forEach에서 데이터가 모두 소비되어 다음 forEach로 전달된 스트림 요소가 존재하지 않음. (파이프라이닝으로 다음 forEach로 전달된 요소가 없음)
- 반복과 병렬성
- 외부반복
- 사용자가 직접 요소를 반복하는 방식 (컬렉션)
- 병렬성을 사용자가 직접 관리해야 함 (syncronized)
- 내부반복
- 반복을 알아서 처리하고 결과 스트림값을 어딘가에 저장
- 데이터 표현과 하드웨어를 활용한 병렬성 구현을 자동으로 선택
데이터 소스
- 컬렉션, 배열, I/O 자원 등의 데이터 제공 소스
- 스트림을 생성하면 자료구조의 순서와 같은 순서의 스트림이 생성된다.
중간연산
- filter, sorted와 같이 다른 연산과 연결될 수 있는 연산
- 중간연산을 이용해서 파이프라인을 구성할 수 있으며 어떤 결과도 생성할 수 있음.
- 게으른 연산 : 중간 연산을 합친 다음에 합쳐진 중간 연산을 최종으로 한번에 처리
- List<String> names =
menu.stream()
.filter(d -> {System.out.println(“filtering” + d.getName()); return d.getCalories() > 300; })
.map(d -> {System.out.println(“mapping” + d.getName()); return d.getname(); })
.limit(3).collect(toList());
System.out.println(names); - filtering pork
mapping pork
filtering beef
mapping beef
filtering chicken
mapping chicken
[pork, beef, chicken] - 쇼트서킷 : limit을 통해 필터링 된 여러 요소 중 3개만 선택됨
- 루프 퓨전 : filter, map 등 다른 연산이지만 한 과정으로 병합되어 처리
- List<String> names =
연산
| 형식
| 사용된 함수 디스크립터
| 설명
|
filter
| Stream<T> filter(Predicate<? super T> predicate)
| T -> boolean
| Predicate(boolean을 반환하는 함수)를 인수로 받아
일치(true)하는 모든 요소를 포함하는 스트림 반환 |
distinct
| Stream<T> distinct()
| 고유 요소 필터링 (상태 있는 언바운드)
hashCode, equals 메서드를 통해 고유여부 판별
|
|
skip
| Stream<T> skip(long n)
| 요소 건너뛰기 (상태 있는 바운드)
처음 n개 요소를 제외한 스트림을 반환
|
|
limit
| Stream<T> limit(long maxSize)
| 스트림 축소 (상태 있는 바운드)
n개 이하의 크기를 갖는 새로운 스트림을 반환
|
|
map
| <R> Stream<R> map(Function<? super T, ? extends R> mapper)
| T -> R
| mapToInt, mapToDouble, mapToLong 메서드를 통해 기본형 특화 스트림(IntStream, DoubleStream, LongStream)으로 변환할 수 있다.
객체스트림으로 복원하려면 숫자 스트림의 boxed 메서드를 호출하면 된다.
|
flatMap
| <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) | T -> Stream<R>
| 각 배열을 스트림이 아니라 스트림의 콘텐츠로 매핑한다.
1개 이상의 스트림의 각 요소를 하나의 스트림으로 변환
|
sorted
| Stream<T> sorted()
Stream<T> sorted(Comparator<? super T> comparator)
| (T, T) -> int
| 상태 있는 언바운드
|
peek
| Stream<T> peek(Consumer<? super T> action)
| T -> T
| 자신이 확인한 요소를 파이프라인의 다음연산으로 그대로 전달
|
최종연산
- 스트림 파이프라인을 처리해서 스트림이 아닌 결과를 반환하는 연산
- 보통 최종 연산에 의해 List, Integer, void 등 스트림 이외의 결과가 반환
- 딱 한번만 탐색할 수 있다.
연산
| 형식
| 사용된 함수 디스크립터
| 설명
|
anyMatch
| boolean anyMatch(Predicate<? super T> predicate)
| T -> boolean
| 프레디케이트가 적어도 한 요소와 일치하는지 확인
boolean 반환
|
noneMatch
| boolean noneMatch(Predicate<? super T> predicate)
| T -> boolean
| 프레디케이트가 모든 요소와 일치하는 요소가 없는지 확인
boolean 반환
|
allMatch
| boolean allMatch(Predicate<? super T> predicate)
| T -> boolean
| 프레디케이트가 모든 요소와 일치하는지 확인
boolean 반환
|
findAny
| Optional<T> findAny()
| 스트림에서 임의의 요소를 반환
반환값이 없을경우 NPE를 피하기위해 Optional로 반환
|
|
findFirst
| Optional<T> findFirst()
| ||
forEach
| void forEach(Consumer<? super T> action)
| T -> void
| 스트림의 각 요소를 소비하면서 람다를 적용.
void를 반환한다.
|
collect
| <R, A> R collect(Collector<? super T, A, R> collector)
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner)
| 스트림을 리듀스해서 리스트(toList), 맵(toMap), 정수 형식의 컬렉션을 만든다.
|
|
reduce
| Optional<T> reduce(BinaryOperator<T> accumulator)
T reduce(T identity, BinaryOperator<T> accumulator)
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner)
| (T, T) -> T
| 상태 있는 바운드(내부 상태의 크기가 한정)
|
count
| long count()
| 스트림의 요소 개수를 반환한다. long을 반환한다
|
5. 스트림 활용
flatMap
“HelloWorld".stream().map( w -> w.split(“”) ).flatMap(Arrays::stream).distinct().collect(toList())
- map( w -> w.split(“”)) : “HelloWorld” => String[] {“H”,”e”,”l”,”l”,”o”,”W”,”o”,”r”,”l”,”d”}
- disticnt() : String[] {“H”,”e”,”l”,”o”,”W”,”r”,”d”}
- flatMap(Arrays:stream) : Stream<String[]> => Stream<String>
- Arrays.stream(T[]) : 배열을 스트림으로 변환
Optional<T>
- 값의 존재/부재 여부를 표현하는 컨테이너 클래스
- null값 반환으로 인한 에러를 피하기 위해 만들어진 기능.
- 제공 메서드
- isParent() : Optional이 값을 포함하녀 true, 없으면 false
- ifPresent(Consumer<T> block) : 값이 있으면 주어진 블록을 실행
- T get() : 값이 존재하면 값을 반환하고, 없으면 NoSuchElementException을 발생시킴
- T orElse(T other) : 값이 있으면 값을 반환하고, 없으면 기본값(other)을 반환한다.
findFirst와 findAny
- 병렬 실행에서는 첫번째 요소를 찾기 어려우므로 요소 반환 순서가 상관없다면 병렬스트림에서는 제약이 적은 findAny를 사용
reduce
reduce(초기값, (초기값 or 이전계산의 결과값, 요소값) -> 반환값이 있는 람다식)
- int sum = numbers.stream().reduce(0, (a,b) -> a + b);
- numbers의 모든 요소의 합을 구하는 코드
Optional<T> reduce((T, T) -> T)
- Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));
- 초기값을 받지 않도록 오버로드된 메서드
- 스트림에 아무런 요소가 존재하지 않는경우 초기값으로 할당할 요소가 존재하지 않아 반환값도 존재하게되지 않으므로 Optional을 반환하도록 설계
- 기본형 특화 스트림에서 값이 없는경우와 실제 결과값이 초기값과 같은경우를 구분하기 위해 기본형 특화 Optional 클래스도 제공한다.
- OptionalInt, OptionalDouble, OptionalLong
내부 상태를 갖는 연산
- 종류 : distinct, skip, limit, sorted, reduce
- 연산을 처리하려면 모든 요소가 버퍼에 추가되어있어야 하거나 결과값을 다음연산의 초기값으로 상태값을 가지고있어야 처리가능한 연산
- 연산을 수행하는데 필요한 저장소의 크기가 정해져 있는경우 바운드(bounded), 스트림의 요소갯수에 따라 무한으로 늘어날 수 있는경우 언바운드(unbounded)
Stream 만들기
- 값으로 스트림 만들기
- Stream<T> of(T t)
- Stream<T> of(T... values)
- 빈 스트림 만들기
- Stream<T> empty()
- 배열로 스트림 만들기
- Stream<T> stream(T[] array)
- 파일로 스트림 만들기
- Stream<String> lines(Path path, Charset cs)
- 함수로 무한 스트림 만들기
- 요청할 때마다 주어진 함수를 이용해서 무제한으로 값을 생성할 수 있으며, 보통 limit 함수와 함께 사용해서 무한 실행되지 않도록 한다.
- Stream<T> iterate(final T seed, final UnaryOperator<T> f)
- Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println)
- 상태값이 있는 메서드. 연속된 일련의 값을 만들 때 사용
- Stream<T> generate(Supplier<T> s)
- Steam.geneate(Math::random).limit(5).forEach(System.out::println)
- 상태값이 없는 메서드, 연속되지 않은 값을 만들 때 사용
6. 스트림으로 데이터 수집
Collectors 정적 팩토리 메서드
팩토리메서드
| 반환형식
| 사용예제
| 설명
|
toList
| List<T>
| List<Dish> dishes = menuStream.collect(toList());
| 스트림의 모든 항목을 리스트로 수집
|
toSet
| Set<T>
| Set<Dish> dishes = menuStream.collect(toSet());
| 스트림의 모든 항목을 중복없는 집합으로 수집
|
toCollection
| Collection<T>
| Collection<Dish> dishes = menuStream.collect(toCollection(), ArrayList::new);
| 스트림의 모든 항목을
공급자가 제공하는 컬렉션으로 수집
|
counting
| Long
| long howManyDishes = menuStream.collect(counting());
| 스트림의 항목 수 계산
|
summingInt
| Integer
| int totalCalories = menuStream.collect(summingInt(Dish::getCalories));
| 스트림의 항목에서 정수 프로퍼티 값을 더함
|
averagingInt
| Double
| double avgCalories = menuStream.collect(averagingInt(Dish::getCalories));
| 스트림 항목의 정수 프로퍼티의 평균값 계산
|
summarizingInt
| IntSummaryStatistics
| IntSummaryStatistics summary = menuStream.collect(summarizingInt(Dish::getCalories));
// summary = {count=9, sum=4300, min=120, average=477.777778, max = 800} | 스트림 내의 항목의 최대, 최소, 합계, 평균 등의 정수 정보 통계를 수집
|
joining
| String
| String shortMenu = menuStream.map(Dish::getName).collect(joining(“, “));
| 스트림의 각 항목에 toString 메서드를 호출한 결과 문자열을 연결
|
maxBy
| Optional<T>
| Optional<Dish> fattest = menuStream.collect(maxBy(comparingInt(Dish::getCalories)));
| 주어진 비교자를 이용해서 스트림의 최대값 요소를 Optional로 감싼 값을 반환.
스트림에 요소가 없는경우 Optional.empty() 반환
|
minBy
| Optional<T>
| Optional<Dish> lightest = menuStream.collect(minBy(comparingInt(Dish::getCalories)));
| 주어진 비교자를 이요해서 스트림의 최소값 요소를 Optional로 감싼 값을 반환
스트림에 요소가 없는경우 Optional.empty() 반환
|
reducing
| 리듀싱 연산에서 형식을 결정
| int totalCalories = menuStream.collect(reducing(0, Dish::getCalories, Integer::sum));
| 누적자를 초깃값으로 설정한 다음 BinaryOperator로 스트림의 각 요소를 반복적으로 누적자와 합쳐 스트림을 하나의 값으로 리듀싱
|
collectingAndThen
| 변환함수가 형식을 반환
| int howManyDishes = menuStream.collect(collectingAndThen(toList(), List::size));
| 다른 컬렉터를 감싸고 그 결과에 변환 함수를 적용
|
groupingBy
| Map<K, List<T>>
| Map<Dish.Type, List<Dish>> dishesByType
= menuStream.collect(groupingBy(Dish::getType), toList());
| 하나의 프로퍼티값을 기준으로 스트림의 항목을 그룹화하며 기준 프로퍼티값을 결과 맵의 키로 사용
|
partitioningBy
| Map<Boolean, List<T>>
| Map<Boolean, List<Dish>> vegetarianDishes
= menuStream.collect(partitioningBy(Dish::isVegetarian));
결과 : {false=[pork, beef], true=[french fries, rice, pizza]} | 프레디케이트를 스트림의 각 항목에 적용한 결과로 항목을 분할
|
Collector.reducing과 Stream.reduce
- reducing
- 도출하려는 결과를 누적하는 컨테이너로 바꾸도록 설계된 메서드
- 가변 컨테이너 작업의 병렬연산에도 안전하다
- reduce
- 두 값을 하나로 도출하는 불변형 연산
- 병렬로 reducing과 같은 연산을 처리하는경우 불변형이 깨지거나 매번 새로운 리스트를 생성하여 값을 할당해야하므로 성능이 저하될 수 있다.
다수준 그룹화
- menu.stream().collect(groupingBy(Dish::getType, groupingBy(dish -> {
if (d.getCalories() <= 400) return CaloricLevel.DIET;
else if (d.getCalorids() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}))); - 결과
{MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]},
FISH={DIET=[prawns], NORMAL=[salmon]},
OTHER={DIET=[rice, seasonal fruit], NORMAL=[french fries, pizza]}}
- 결과
- menu.stream().collect(groupingBy(Dish::getType, counting()));
- 결과 : {MEAT=3, FISH=2, OTHER=4}
Collector Interface
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
enum Characteristics {
CONCURRENT, UNORDERED, IDENTITY_FINISH
}
}
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
enum Characteristics {
CONCURRENT, UNORDERED, IDENTITY_FINISH
}
}
- supplier
- accumulator에서 사용할 빈 누적자 인스턴스를 만드는 함수
- accumulator
- 리듀싱 연산을 수행하는 함수(void return) 반환
- supplier가 생성한 누적자 인스턴스를 받아 함수의 상태값을 저장하고 반환값은 void가 된다. (내부탐색만 가능하며 계속 상태가 바뀌므로 어떤값일지 추적 불가)
- combiner
- 마지막으로 리듀싱 연산에서 사용할 함수를 반환
- 스트림의 서로 다른 서브파트를 병렬로 처리할 때 누적자가 이 결과를 어떻게 처리할지 정의
- 병렬 처리시 스트림을 분할해야 하는지 정의하는 조건이 거짓으로 바뀌기 전까지 원래 스트림을 재귀적으로 분할하며,
일반적으로 프로세싱 코어의 개수 이하의 병렬작업이 효율적이다.(p214)
- finisher
- 누적과정을 끝낼 때 호출할 함수를 반환
- 누적자 객체가 이미 최종결과인 경우 변환과정이 필요없는 항등함수(Function.identity())를 반환하면 된다.
- characteristics
- 스트림을 병렬로 리듀스할 것인지, 병렬로 리듀스 할 경우 어떤 최적화를 선택해야할지 힌트를 제공
- UNORDERED
- 리듀싱 결과는 스트림 요소의 방문순서나 누적 순서에 영향을 받지 않는다.
- CONCURRENT
- 다중 스레드에서 accumulator 함수를 동시에 호출할 수 있으며 이 컬렉터는 스트림의 병렬 리듀싱을 수행할 수 있다.
- UNORDERED를 함께 설정하지 않았다면 데이터 소스의 순서가 무의미한 상황에서만 병렬 리듀싱을 수행할 수 있다.
- IDENTITY_FINISH
- 리듀싱 과정의 최종 결과로 누적자 객체를 바로 사용할 수 있으며, 누적자 객체를 A -> R로 안전하게 형변환 할 수 있다.
7. 병렬 데이터 처리와 성능
Stream.parallel()
- 스트림 자체에는 아무 변화도 일어나지 않는다
- parallel을 호출하면 내부적으로 이후 연산이 병렬로 수행해야 함을 의미하는 불린 플래그가 설정된다.
Stream.sequential()
- 병렬 스트림을 순차 스트림으로 바꿀 수 있다.
- sequential을 호출하면 이후 연산이 순차적으로 수행해야 함을 의미하는 불린 플래그가 설정된다.
병렬 스트림에서 사용하는 스레드 풀 설정
- 병렬스트림은 내부적으로 ForkJoinPool을 사용
- ForkJoinPool은 기본적으로 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 사용(가상 프로세서를 포함한 프로세서 수)
병렬 스트림 사용시 주의해야할 점
- 자료구조 변화에 따른 auto boxing으로 인한 성능저하
- 기본형 특화 스트림을 사용하여 auto boxing 회피
- 병렬실행을 위한 재귀적인 청크 분할, 스레드 할당, 머지 등 작업을 위한 비용과 성능개선비용의 관계
- 성능측정을 통한 병렬 스트림 사용시의 이점 확인
- 소량의 데이터의 경우 병렬화 과정으로 인한 비용이 더 크게 발생할 수 있다.
- 최종연산의 병합과정 비용보다 병렬 스트림으로 얻은 성능의 이익이 더 큰지 확인
- 순서에 의존하는 limit, findFirst 등 연산은 병렬스트림에서 성능이 저하됨.
- 공유된 가변상태 변수로 인한 로직 오류 발생
- 효율적으로 분할 가능한 자료구조로 스트림이 구성되었는지 확인
- 분해성
- 훌륭함 : ArrayList, IntStream.range
- 좋음 : HashSet, TreeSet
- 나쁨 : LinkedList, Stream.iterate
Fork/Join Framework
(병렬작업의 분할/작업/머지 등을 컨트롤 하고싶은경우 직접 구현가능 - p238~245)
- 작업 훔치기
- ForkJoinPool에서 각 스레드는 자신에게 할당된 태스크를 포함하는 이중연결 리스트를 참조하면서 작업이 끝날때 마다 큐의 HEAD 에서 다른 태스크를 가져와 작업을 처리
- 특정 스레드가 먼저 작업이 완료되었을 ㄷ경우 다른 스레드 큐의 TAIL 에서 작업을 훔쳐와 모든 큐가 빌 때 까지 이 작업을 반복하여 스레드간 작업부하를 비슷한 수준으로 유지
- Spliterator
- 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 대해 사용할 수 있는 디폴트 Spliterator 구현을 제공
- 제공 메서드
- tryAdvance : 일반적인 Iterator 동작과 동일
- trySplit : 스트림을 재귀적으로 분할하며 모든 trySplit 결과가 null이면 분할과정이 종료된다.
- Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두번째 Spliterator를 생성하는 메서드
- estimateSize 메서드로 탐색해야할 요소 수 정보를 제공할 수 있다.(정확성 X)
- characteristics : Spliterator 자체의 특성 집합을 포함하는 int를 반환하며, 이 특성을 이용해서 Spliterator를 더 잘 제어하고 최적화 할 수 있다
- ORDERED, DISTINCT, SORTED, SIZE, NONNULL, IMMUTABLE, CONCURRENT, SUBSIZED
'development > Java' 카테고리의 다른 글
[도서] Java8 in Action - 새로운 날짜/시간 API (0) | 2017.04.25 |
---|---|
serialize와 serialVersionUID (0) | 2015.06.18 |