'Stream'에 해당되는 글 1건

  1. 2017.04.25 [도서] Java8 in Action - Stream

Java8 in Action 정리


4. 스트림 소개


스트림 API
  • 데이터 컬렉션 반복을 선언형으로 처리하고 조립하는 기능
  • 멀티 스레드 코드를 구현하지 않고도 데이터 처리과정을 병렬화하여 스레드와 락 걱정없이 처리 가능
  • 스트림 API는 매우 비싼 연산이다.

스트림
  • 데이터 처리 연산을 지원하도록 소스에서 추출된 연속된 요소

스트림과 컬렉션 차이
  • 데이터를 언제 계산하는가
    • 스트림 : 요청할 때만 요소를 계산하는 고정된 자료구조 (스트림에 요소를 추가/제거 불가)
    • 컬렉션 : 현재 자료구조가 포함하는 모든 값을 메모리에 저장
  • 딱 한번만 탐색할 수 있다.
    • Awways.asList(“1”,”2”).stream().forEach(System.out::println).forEach(System.out::println)
      • 첫번째 forEach에서 데이터가 모두 소비되어 다음 forEach로 전달된 스트림 요소가 존재하지 않음. (파이프라이닝으로 다음 forEach로 전달된 요소가 없음)
  • 반복과 병렬성
    • 외부반복
      • 사용자가 직접 요소를 반복하는 방식 (컬렉션)
      • 병렬성을 사용자가 직접 관리해야 함 (syncronized)
    • 내부반복
      • 반복을 알아서 처리하고 결과 스트림값을 어딘가에 저장
      • 데이터 표현과 하드웨어를 활용한 병렬성 구현을 자동으로 선택

데이터 소스
  • 컬렉션, 배열, I/O 자원 등의 데이터 제공 소스
  • 스트림을 생성하면 자료구조의 순서와 같은 순서의 스트림이 생성된다.

중간연산
  • filter, sorted와 같이 다른 연산과 연결될 수 있는 연산
  • 중간연산을 이용해서 파이프라인을 구성할 수 있으며 어떤 결과도 생성할 수 있음.
  • 게으른 연산 : 중간 연산을 합친 다음에 합쳐진 중간 연산을 최종으로 한번에 처리
    • List<String> names =
      menu.stream()
               .filter(d -> {System.out.println(“filtering” + d.getName()); return d.getCalories() > 300; })
               .map(d -> {System.out.println(“mapping” + d.getName()); return d.getname(); })
               .limit(3).collect(toList());
      System.out.println(names);
    • filtering pork
      mapping pork
      filtering beef
      mapping beef
      filtering chicken
      mapping chicken
      [pork, beef, chicken]
    • 쇼트서킷 : limit을 통해 필터링 된 여러 요소 중 3개만 선택됨
    • 루프 퓨전 : filter, map 등 다른 연산이지만 한 과정으로 병합되어 처리

연산
형식
사용된 함수 디스크립터
설명
filter
Stream<T> filter(Predicate<? super T> predicate)
T -> boolean
Predicate(boolean을 반환하는 함수)를 인수로 받아
일치(true)하는 모든 요소를 포함하는 스트림 반환
distinct
Stream<T> distinct()

고유 요소 필터링 (상태 있는 언바운드)
hashCode, equals 메서드를 통해 고유여부 판별
skip
Stream<T> skip(long n)

요소 건너뛰기 (상태 있는 바운드)
처음 n개 요소를 제외한 스트림을 반환
limit
Stream<T> limit(long maxSize)

스트림 축소 (상태 있는 바운드)
n개 이하의 크기를 갖는 새로운 스트림을 반환
map
<R> Stream<R> map(Function<? super T, ? extends R> mapper)
T -> R
mapToInt, mapToDouble, mapToLong 메서드를 통해 기본형 특화 스트림(IntStream, DoubleStream, LongStream)으로 변환할 수 있다.
객체스트림으로 복원하려면  숫자 스트림의 boxed 메서드를 호출하면 된다.
flatMap
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)
T -> Stream<R>
각 배열을 스트림이 아니라 스트림의 콘텐츠로 매핑한다.
1개 이상의 스트림의 각 요소를 하나의 스트림으로 변환
sorted
Stream<T> sorted()
Stream<T> sorted(Comparator<? super T> comparator)

(T, T) -> int
상태 있는 언바운드
peek
Stream<T> peek(Consumer<? super T> action)
T -> T
자신이 확인한 요소를 파이프라인의 다음연산으로 그대로 전달

최종연산
  • 스트림 파이프라인을 처리해서 스트림이 아닌 결과를 반환하는 연산
  • 보통 최종 연산에 의해 List, Integer, void 등 스트림 이외의 결과가 반환
  • 딱 한번만 탐색할 수 있다.

연산
형식
사용된 함수 디스크립터
설명
anyMatch
boolean anyMatch(Predicate<? super T> predicate)
T -> boolean
프레디케이트가 적어도 한 요소와 일치하는지 확인
boolean 반환
noneMatch
boolean noneMatch(Predicate<? super T> predicate)
T -> boolean
프레디케이트가 모든 요소와 일치하는 요소가 없는지 확인
boolean 반환
allMatch
boolean allMatch(Predicate<? super T> predicate)
T -> boolean
프레디케이트가 모든 요소와 일치하는지 확인
boolean 반환
findAny
Optional<T> findAny()

스트림에서 임의의 요소를 반환
반환값이 없을경우  NPE를 피하기위해 Optional로 반환
findFirst
Optional<T> findFirst()


forEach
void forEach(Consumer<? super T> action)
T -> void
스트림의 각 요소를 소비하면서 람다를 적용.
void를 반환한다.
collect
<R, A> R collect(Collector<? super T, A, R> collector)
<R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner)

스트림을 리듀스해서 리스트(toList), 맵(toMap), 정수 형식의 컬렉션을 만든다.
reduce
Optional<T> reduce(BinaryOperator<T> accumulator)
T reduce(T identity, BinaryOperator<T> accumulator)
<U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner)
(T, T) -> T
상태 있는 바운드(내부 상태의 크기가 한정)
count
long count()

스트림의 요소 개수를 반환한다. long을 반환한다


5. 스트림 활용


flatMap

“HelloWorld".stream().map( w -> w.split(“”) ).flatMap(Arrays::stream).distinct().collect(toList())
  • map( w -> w.split(“”)) : “HelloWorld” => String[] {“H”,”e”,”l”,”l”,”o”,”W”,”o”,”r”,”l”,”d”}
  • disticnt() : String[] {“H”,”e”,”l”,”o”,”W”,”r”,”d”}
  • flatMap(Arrays:stream) : Stream<String[]> => Stream<String>
    • Arrays.stream(T[]) : 배열을 스트림으로 변환 

Optional<T>
  • 값의 존재/부재 여부를 표현하는 컨테이너 클래스
  • null값 반환으로 인한 에러를 피하기 위해 만들어진 기능.
  • 제공 메서드
    • isParent() : Optional이 값을 포함하녀 true, 없으면 false
    • ifPresent(Consumer<T> block) : 값이 있으면 주어진 블록을 실행
    • T get() : 값이 존재하면 값을 반환하고, 없으면 NoSuchElementException을 발생시킴
    • T orElse(T other) : 값이 있으면 값을 반환하고, 없으면 기본값(other)을 반환한다.

findFirst와 findAny
  • 병렬 실행에서는 첫번째 요소를 찾기 어려우므로 요소 반환 순서가 상관없다면 병렬스트림에서는 제약이 적은 findAny를 사용


reduce

reduce(초기값, (초기값 or 이전계산의 결과값, 요소값) -> 반환값이 있는 람다식)
  • int sum = numbers.stream().reduce(0, (a,b) -> a + b);
    • numbers의 모든 요소의 합을 구하는 코드
Optional<T> reduce((T, T) -> T)
  • Optional<Integer> sum = numbers.stream().reduce((a, b) ->  (a + b));
  • 초기값을 받지 않도록 오버로드된 메서드
  • 스트림에 아무런 요소가 존재하지 않는경우 초기값으로 할당할 요소가 존재하지 않아 반환값도 존재하게되지 않으므로 Optional을 반환하도록 설계
  • 기본형 특화 스트림에서 값이 없는경우와 실제 결과값이 초기값과 같은경우를 구분하기 위해 기본형 특화 Optional 클래스도 제공한다.
    • OptionalInt, OptionalDouble, OptionalLong

내부 상태를 갖는 연산
  • 종류 : distinct, skip, limit, sorted, reduce
  • 연산을 처리하려면 모든 요소가 버퍼에 추가되어있어야 하거나 결과값을 다음연산의 초기값으로 상태값을 가지고있어야 처리가능한 연산
  • 연산을 수행하는데 필요한 저장소의 크기가 정해져 있는경우 바운드(bounded), 스트림의 요소갯수에 따라 무한으로 늘어날 수 있는경우 언바운드(unbounded)

Stream 만들기
  • 값으로 스트림 만들기
    • Stream<T> of(T t)
    • Stream<T> of(T... values)
  • 빈 스트림 만들기
    • Stream<T> empty()
  • 배열로 스트림 만들기
    • Stream<T> stream(T[] array)
  • 파일로 스트림 만들기
    • Stream<String> lines(Path path, Charset cs)
  • 함수로 무한 스트림 만들기
    • 요청할 때마다 주어진 함수를 이용해서 무제한으로 값을 생성할 수 있으며, 보통 limit 함수와 함께 사용해서 무한 실행되지 않도록 한다.
    • Stream<T> iterate(final T seed, final UnaryOperator<T> f)
      • Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println)
      • 상태값이 있는 메서드. 연속된 일련의 값을 만들 때 사용
    • Stream<T> generate(Supplier<T> s)
      • Steam.geneate(Math::random).limit(5).forEach(System.out::println)
      • 상태값이 없는 메서드, 연속되지 않은 값을 만들 때 사용


6. 스트림으로 데이터 수집

Collectors 정적 팩토리 메서드

팩토리메서드
반환형식
사용예제
설명
toList
List<T>
List<Dish> dishes = menuStream.collect(toList());
스트림의 모든 항목을 리스트로 수집
toSet
Set<T>
Set<Dish> dishes = menuStream.collect(toSet());
스트림의 모든 항목을 중복없는 집합으로 수집
toCollection
Collection<T>
Collection<Dish> dishes = menuStream.collect(toCollection(), ArrayList::new);
스트림의 모든 항목을
공급자가 제공하는 컬렉션으로 수집
counting
Long
long howManyDishes = menuStream.collect(counting());
스트림의 항목 수 계산
summingInt
Integer
int totalCalories = menuStream.collect(summingInt(Dish::getCalories));
스트림의 항목에서 정수 프로퍼티 값을 더함
averagingInt
Double
double avgCalories = menuStream.collect(averagingInt(Dish::getCalories));
스트림 항목의 정수 프로퍼티의 평균값 계산
summarizingInt
IntSummaryStatistics
IntSummaryStatistics summary = menuStream.collect(summarizingInt(Dish::getCalories));
// summary = {count=9, sum=4300, min=120, average=477.777778, max = 800}
스트림 내의 항목의 최대, 최소, 합계, 평균 등의 정수 정보 통계를 수집
joining
String
String shortMenu = menuStream.map(Dish::getName).collect(joining(“, “));
스트림의 각 항목에 toString 메서드를 호출한 결과 문자열을 연결
maxBy
Optional<T>
Optional<Dish> fattest = menuStream.collect(maxBy(comparingInt(Dish::getCalories)));
주어진 비교자를 이용해서 스트림의 최대값 요소를 Optional로 감싼 값을 반환.
스트림에 요소가 없는경우 Optional.empty() 반환
minBy
Optional<T>
Optional<Dish> lightest = menuStream.collect(minBy(comparingInt(Dish::getCalories)));
주어진 비교자를 이요해서 스트림의 최소값 요소를 Optional로 감싼 값을 반환
스트림에 요소가 없는경우 Optional.empty() 반환
reducing
리듀싱 연산에서 형식을 결정
int totalCalories = menuStream.collect(reducing(0, Dish::getCalories, Integer::sum));
누적자를 초깃값으로 설정한 다음 BinaryOperator로 스트림의 각 요소를 반복적으로 누적자와 합쳐 스트림을 하나의 값으로 리듀싱
collectingAndThen
변환함수가 형식을 반환
int howManyDishes = menuStream.collect(collectingAndThen(toList(), List::size));
다른 컬렉터를 감싸고 그 결과에 변환 함수를 적용
groupingBy
Map<K, List<T>>
Map<Dish.Type, List<Dish>> dishesByType
= menuStream.collect(groupingBy(Dish::getType), toList());
하나의 프로퍼티값을 기준으로 스트림의 항목을 그룹화하며 기준 프로퍼티값을 결과 맵의 키로 사용
partitioningBy
Map<Boolean, List<T>>
Map<Boolean, List<Dish>> vegetarianDishes
= menuStream.collect(partitioningBy(Dish::isVegetarian));
결과 : {false=[pork, beef], true=[french fries, rice, pizza]}
프레디케이트를 스트림의 각 항목에 적용한 결과로 항목을 분할


Collector.reducing과 Stream.reduce
  • reducing
    • 도출하려는 결과를 누적하는 컨테이너로 바꾸도록 설계된 메서드
    • 가변 컨테이너 작업의 병렬연산에도 안전하다
  • reduce
    • 두 값을 하나로 도출하는 불변형 연산
    • 병렬로 reducing과 같은 연산을 처리하는경우 불변형이 깨지거나 매번 새로운 리스트를 생성하여 값을 할당해야하므로 성능이 저하될 수 있다.

다수준 그룹화
  • menu.stream().collect(groupingBy(Dish::getType, groupingBy(dish -> {
        if (d.getCalories() <= 400) return CaloricLevel.DIET;
        else if (d.getCalorids() <= 700) return CaloricLevel.NORMAL;
        else return CaloricLevel.FAT;
    })));
    • 결과
      {MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]},
       FISH={DIET=[prawns], NORMAL=[salmon]},
       OTHER={DIET=[rice, seasonal fruit], NORMAL=[french fries, pizza]}}
  • menu.stream().collect(groupingBy(Dish::getType, counting()));
    • 결과 : {MEAT=3, FISH=2, OTHER=4}

Collector Interface

public interface Collector<TAR> {
    Supplier<Asupplier();
    BiConsumer<ATaccumulator();
    BinaryOperator<Acombiner();
    Function<ARfinisher();
    Set<Characteristics> characteristics();
   
    enum Characteristics {
        CONCURRENTUNORDEREDIDENTITY_FINISH
    }
}

  1. supplier
    • accumulator에서 사용할 빈 누적자 인스턴스를 만드는 함수
  2. accumulator
    • 리듀싱 연산을 수행하는 함수(void return) 반환
    • supplier가 생성한 누적자 인스턴스를 받아 함수의 상태값을 저장하고 반환값은 void가 된다. (내부탐색만 가능하며 계속 상태가 바뀌므로 어떤값일지 추적 불가)
  3. combiner
    • 마지막으로 리듀싱 연산에서 사용할 함수를 반환
    • 스트림의 서로 다른 서브파트를 병렬로 처리할 때 누적자가 이 결과를 어떻게 처리할지 정의
    • 병렬 처리시 스트림을 분할해야 하는지 정의하는 조건이 거짓으로 바뀌기 전까지 원래 스트림을 재귀적으로 분할하며,
      일반적으로 프로세싱 코어의 개수 이하의 병렬작업이 효율적이다.(p214)
  4. finisher
    • 누적과정을 끝낼 때 호출할 함수를 반환
    • 누적자 객체가 이미 최종결과인 경우 변환과정이 필요없는 항등함수(Function.identity())를 반환하면 된다.
  5. characteristics
    • 스트림을 병렬로 리듀스할 것인지, 병렬로 리듀스 할 경우 어떤 최적화를 선택해야할지 힌트를 제공
    • UNORDERED
      • 리듀싱 결과는 스트림 요소의 방문순서나 누적 순서에 영향을 받지 않는다.
    • CONCURRENT
      • 다중 스레드에서 accumulator 함수를 동시에 호출할 수 있으며 이 컬렉터는 스트림의 병렬 리듀싱을 수행할 수 있다.
      • UNORDERED를 함께 설정하지 않았다면 데이터 소스의 순서가 무의미한 상황에서만 병렬 리듀싱을 수행할 수 있다.
    • IDENTITY_FINISH
      • 리듀싱 과정의 최종 결과로 누적자 객체를 바로 사용할 수 있으며, 누적자 객체를  A -> R로 안전하게 형변환 할 수 있다.


7. 병렬 데이터 처리와 성능

Stream.parallel()
  • 스트림 자체에는 아무 변화도 일어나지 않는다
  • parallel을 호출하면 내부적으로 이후 연산이 병렬로 수행해야 함을 의미하는 불린 플래그가 설정된다.

Stream.sequential()
  • 병렬 스트림을 순차 스트림으로 바꿀 수 있다.
  • sequential을 호출하면 이후 연산이 순차적으로 수행해야 함을 의미하는 불린 플래그가 설정된다.

병렬 스트림에서 사용하는 스레드 풀 설정
  • 병렬스트림은 내부적으로 ForkJoinPool을 사용
  • ForkJoinPool은 기본적으로 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 사용(가상 프로세서를 포함한 프로세서 수)

병렬 스트림 사용시 주의해야할 점
  • 자료구조 변화에 따른 auto boxing으로 인한 성능저하
    • 기본형 특화 스트림을 사용하여 auto boxing 회피
  • 병렬실행을 위한 재귀적인 청크 분할, 스레드 할당, 머지 등 작업을 위한 비용과 성능개선비용의 관계
    • 성능측정을 통한 병렬 스트림 사용시의 이점 확인
    • 소량의 데이터의 경우 병렬화 과정으로 인한 비용이 더 크게 발생할 수 있다.
    • 최종연산의 병합과정 비용보다 병렬 스트림으로 얻은 성능의 이익이 더 큰지 확인
  • 순서에 의존하는 limit, findFirst 등 연산은 병렬스트림에서 성능이 저하됨.
  • 공유된 가변상태 변수로 인한 로직 오류 발생
  • 효율적으로 분할 가능한 자료구조로 스트림이 구성되었는지 확인
    • 분해성
      • 훌륭함 : ArrayList, IntStream.range
      • 좋음 : HashSet, TreeSet
      • 나쁨 : LinkedList, Stream.iterate

Fork/Join Framework
(병렬작업의 분할/작업/머지 등을 컨트롤 하고싶은경우 직접 구현가능 - p238~245)

  • 작업 훔치기
    • ForkJoinPool에서 각 스레드는 자신에게 할당된 태스크를 포함하는 이중연결 리스트를 참조하면서 작업이 끝날때 마다 큐의 HEAD 에서 다른 태스크를 가져와 작업을 처리
    • 특정 스레드가 먼저 작업이 완료되었을 ㄷ경우 다른 스레드 큐의 TAIL 에서 작업을 훔쳐와 모든 큐가 빌 때 까지 이 작업을 반복하여 스레드간 작업부하를 비슷한 수준으로 유지
  • Spliterator
    • 자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 대해 사용할 수 있는 디폴트 Spliterator 구현을 제공
    • 제공 메서드
      • tryAdvance : 일반적인 Iterator 동작과 동일
      • trySplit : 스트림을 재귀적으로 분할하며 모든 trySplit 결과가 null이면 분할과정이 종료된다.
        • Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두번째 Spliterator를 생성하는 메서드
        • estimateSize 메서드로 탐색해야할 요소 수 정보를 제공할 수 있다.(정확성 X)
      • characteristics : Spliterator 자체의 특성 집합을 포함하는 int를 반환하며, 이 특성을 이용해서 Spliterator를 더 잘 제어하고 최적화 할 수 있다
        • ORDERED, DISTINCT, SORTED, SIZE, NONNULL, IMMUTABLE, CONCURRENT, SUBSIZED


'development > Java' 카테고리의 다른 글

[도서] Java8 in Action - 새로운 날짜/시간 API  (0) 2017.04.25
serialize와 serialVersionUID  (0) 2015.06.18
Posted by dreamhopp
,