❮
[번역] 다트 비동기 프로그래밍 : streams
20200605
- 스트림은 비동기 데이터 시퀀스를 제공합니다.
- 데이터 시퀀스는 유저가 만든 이벤트와 파일로 부터의 데이터 입력을 포함합니다.
- await for 나 listen() 으로 스트림을 처리할 수 있습니다.
- 스트림은 에러를 처리하기 위해 사용할 수 있습니다.
- 단일 구독 스트림과 브로드캐스트 스트림으로 구분할 수 있습니다.
다트에서 비동기 프로그래밍은 Future와 Stream 클래스로 구현됩니다.
Future는 바로 완료되지 않는 연산을 나타냅니다. 결과를 리턴하는 일반적인 함수에서, 비동기 함수는 연산이 완료되면 최종 연산 결과를 담게 되는 Future 를 리턴합니다..
Stream은 비동기 이벤트들의 연속입니다. 요청을 하면 다음 이벤트를 주는 비동기 Iterable 과 비슷하지만, stream은 이벤트가 준비가 된 다음 이벤트를 줍니다.
스트림 이벤트 받기
다른 글에서 다루겠지만, 스트림은 다양한 방법으로 만들어집니다. 하지만 모든 스트림은 모두 같은 방법으로 사용됩니다. 비동기를 위한 루프 (asynchronous for loop ; await for) 는 스트림의 이벤트들에 대해 반복합니다.
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (var value in stream) {
sum += value;
}
return sum;
}
위 코드는 스트림에서 각각에 정수 이벤트를 받아서 합을 반환하는 코드입니다. 루프가 끝날때마다, 다음 이벤트가 들어오거나 스트림이 끝날 때까지 함수는 일시정지됩니다.
await for 루프를 사용하려면 함수는 async 키워드를 함수에 붙여줘야 합니다.
다음 예제는 async* 함수를 이용, 간단한 정수 스트림을 만들어서 위 코드를 테스트 합니다.
// Copyright (c) 2015, the Dart project authors.
// Please see the AUTHORS file for details.
// All rights reserved. Use of this source code is governed
// by a BSD-style license that can be found in the LICENSE file.
import 'dart:async';
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (var value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // 55
}
에러 이벤트
스트림은 더 이상 이벤트가 없으면 종료됩니다. 이벤트를 받는 코드는 새로운 이벤트가 도착한다는 알림을 받는 즉시 스트림이 종료되는지에 대해서도 알게 됩니다. await for 루프를 이용한다면, 스트림이 종료되면 루프도 멈추게 됩니다.
몇몇 경우에는, 스트림이 종료되기 전에 에러가 발생합니다. 아마도 원격 서버에서 파일을 꺼내는 동안 네트워크 실패가 일어났거나, 아니면 이벤트를 만드는 코드 자체에 버그가 있었거나... 누군가는 이런 일들에 대해서 알아야 합니다.
스트림은 데이터 이벤트를 전달하듯이 에러 이벤트를 전달할 수 있습니다. 대부분의 스트림은 첫 에러 이후에 멈추게 되지만 둘 이상의 에러를 전달하는 스트림과 에러 이벤트 후에 추가 데이터를 전달하는 스트림이 있을 수 있습니다. 이 문서에서는 하나의 에러를 전달하는 스트림에 대해서만 다룹니다.
스트림을 await for 를 사용해서 읽을 때는, 루프 문에 의해서 에러가 발생합니다. 물론 이 경우에 그 루프는 끝나게 됩니다. try-catch 를 이용해서 에러를 처리할 수 있습니다.
루프 반복자(iterator) 가 4일때 에러를 발생하는 예제입니다.
// Copyright (c) 2015, the Dart project authors.
// Please see the AUTHORS file for details.
// All rights reserved. Use of this source code is governed
// by a BSD-style license that can be found in the LICENSE file.
import 'dart:async';
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
try {
await for (var value in stream) {
sum += value;
}
} catch (e) {
return -1;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i == 4) {
throw new Exception('Intentional exception');
} else {
yield i;
}
}
}
main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // -1
}
스트림으로 작업하기
스트림 클래스는
Iterable 클래스와 비슷한 여러 헬퍼 메소드들을 포함하고 있습니다.
예를 들어, lastWhere() 을 사용해서 마지막 양수를 찾을 수 있습니다.
Future<int> lastPositive(Stream<int> stream) =>
stream.lastWhere((x) => x >= 0);
두 종류의 스트림
스트림을 두 가지로 구분할 수 있습니다.
단일 구독 스트림 ( Single subscription streams )
더 많은 전체 이벤트의 일부분을 포함하는 일반적인 스트림입니다. 이벤트들은 정확한 순서대로 전달되어야 하고 하나도 놓치면 안됩니다. 이런 스트림은 파일을 읽거나 웹 요청을 받을 때 사용합니다.
이런 스트림은 한 번만 listen 할 수 있습니다. 나중에 다시 listen 하게 되면 초기 이벤트들이 누락되고, 스트림의 나머지 부분이 무의미해질 수도 있습니다. 한번 listen 하게 되면 데이터를 여러 덩어리로 나누어서 불러오게 됩니다.
브로드캐스트 스트림 ( broadcast streams )
한번에 하나씩 처리하기 위한 개인적인 메시지를 위해서 고안되었습니다. 이런 스트림은 웹 브라우저에서 마우스 이벤트 같은 것을 처리할 수 있습니다.
어떤 시점에 이런 스트림을 listen 하기 시작한다면, listen 하는 동안 일어나고 있는 이벤트들을 얻게 됩니다. 둘 이상의 리스너가 동시에 listen 할 수 있고, 이전 구독이 취소되고 나서도 다시 들을 수 있습니다.
스트림을 처리하는 메소드들
아래 메소드들은 Stream<T> 에서 스트림을 처리하고 결과를 리턴합니다.
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
drain() 과 pipe() 를 제외하면
Iterable 과 동일합니다. 각각의 함수들은 await for 루프가 있는 async 함수를 이용하거나 다른 메소드들 중의 하나를 재활용해서 다시 작성해볼 수 있습니다.
예를 들어 이런 구현이 있을 수 있습니다.
Future<bool> contains(Object needle) async {
await for (var event in this) {
if (event == needle) return true;
}
return false;
}
Future forEach(void Function(T element) action) async {
await for (var event in this) {
action(event);
}
}
Future<List<T>> toList() async {
final result = <T>[];
await this.forEach(result.add);
return result;
}
Future<String> join([String separator = ""]) async =>
(await this.toList()).join(separator);
주로 역사적인 이유로 실제 구현은 약간 더 복잡합니다.
스트림을 수정하는 메소드들
아래 메소드들은 기존 스트림을 기반으로 새로운 스트림을 리턴합니다. 각각은 누군가가 기존 스트림을 듣기 전에 새로운 스트림을 들을 때까지 기다립니다.
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);
이 메소드들은 Iterable 에서 Iterable을 다른 Iterable 로 변환하는 메소드들과 유사합니다. 이것들은 await for 루프가 있는 async 함수들로 다시 쓸 수 있습니다.
Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next) equals]);
asyncExpand() 과 asyncMap() 함수는 expand() 와 map() 과 유사하지만 함수의 인자에 비동기 함수가 온다는 점이 다릅니다. distinct() 함수는 Iterable 에는 없습니다.
Stream<T> handleError(Function onError, {bool test(error)});
Stream<T> timeout(Duration timeLimit,
{void Function(EventSink<T> sink) onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
마지막 특별한 3개의 함수가 있습니다. await for 루프에서 할 수 없는 에러 처리법을 포함합니다. 루프가 첫 에러에 도착하면 루프를 끝내고 스트림 구독도 끝냅니다. 이건 복구할 수 없습니다. handleError() 를 사용하면 await for 루프가 에러에 도착하기 전에 에러를 스트림에서 제거할 수 있습니다.
transform() 함수
transform() 함수는 에러 처리만을 하는게 아닙니다. 이건 stream을 위한 더 일반화된 'map' 입니다.
일반 map 은 각각의 들어오는 이벤트마다 하나의 값을 요구합니다. 그런데 I/O 스트림 같은 경우에는, 하나의 출력이벤트를 만들기 위해서 여러 입력 이벤트가 필요할 수 있습니다. 그럴때는
StreamTransformer 를 이용할 수 있습니다.
transformer의 예시로는
Utf8Decoder 같은 디코더들이 있습니다. transformer는 bind() 라는 함수 하나가 필요합니다. bind() 는 async 함수를 이용해서 쉽게 구현할 수 있습니다.
Stream<S> mapLogErrors<S, T>(
Stream<T> stream,
S Function(T event) convert,
) async* {
var streamWithoutErrors = stream.handleError((e) => log(e));
await for (var event in streamWithoutErrors) {
yield convert(event);
}
}
파일을 읽고 디코딩하기
다음 코드는 파일을 읽고 그 스트림에 두 번의 변환을 실행합니다.
먼저 데이터를 UTF8 으로 변환하고 LineSplitter 를 통해서 변환합니다.
#로 시작하는 줄을 제외한 모든 줄이 출력됩니다.
import 'dart:convert';
import 'dart:io';
Future<void> main(List<String> args) async {
var file = File(args[0]);
var lines = utf8.decoder
.bind(file.openRead())
.transform(LineSplitter());
await for (var line in lines) {
if (!line.startsWith('#')) print(line);
}
}
listen() 메소드
스트림의 마지막 메소드는 listen() 입니다. 이건 저수준 메소드 입니다. 다른 스트림 함수들은 전부 listen() 으로 정의될 수 있습니다.
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError});
새로운 스트림 타입을 만들기 위해서, Stream 클래스를 확장하고 listen() 메소드를 구현하면 됩니다. 다른 모든 스트림 메소드들은 이 listen() 을 이용해서 호출됩니다.
listen() 메소드는 스트림을 listen 하게 해줍니다. 스트림은 listen하기 전에는 그저 비활성된 객체입니다. listen을 하게 되면
StreamSubscription 객체가 리턴됩니다. 이 객체는 이벤트를 내놓는 활성화된 스트림을 나타냅니다. 이건 Iterable은 그저 객체의 콜렉션이고, 실제 반복을 수행하는 것은 iterator인 것과 유사합니다.
StreamSubscription 은 스트림 구독을 일시 정지, 재개, 완전히 취소하는 것을 할 수 있습니다. 데이터 이벤트. 에러 이벤트, 스트림이 끝날 때에 대한 콜백함수를 지정할 수 있습니다.
다른 자료들