본문 바로가기
IT/Flutter

<Dart> Streams 비동기 프로그래밍

by 세계 최고의 AI Engineer_naknak 2023. 3. 16.

결론

The dart:async library contains two types that are important for many Dart APIs: Stream and Future. Where a Future represents the result of a single computation, a stream is a sequence of results. 

 

Future은 단일 계산의 결과에 대한 사용, stream은 연속된 결과를 위해 사용됩니다!!!


 

네, 이번 시간에는 비동기 프로그래밍인 Strams에 대해서 알아보도록 하겠습니다.

사실 Stram은 저번에 포스팅 했던 Future과 비슷한 개념이라고 생각하시면 좋을 거 같아요.

 

 

 

이번 포스팅도 역시 Dart 공식 문서에 나온 내용을 바탕으로 쓰여졌습니다.

https://dart.dev/tutorials/language/streams

 

Asynchronous programming: Streams

Learn how to consume single-subscriber and broadcast streams.

dart.dev

 

먼저 오늘 알아야할 요점에 대해서 살펴볼까요?

What's the point?

  • Streams provide an asynchronous sequence of data.
  • Data sequences include user-generated events and data read from files.
  • You can process a stream using either await for or listen() from the Stream API.
  • Streams provide a way to respond to errors.
  • There are two kinds of streams: single subscription or broadcast.

스트림은 데이터의 비동기적 순서를 제공합니다.

데이터 순서는 유저가 생성한 이벤트들과 파일로부터 읽어진 데이터를 포함합니다.

await for이나 listen() 같은 문법(Stream API에서)으로 stream을 사용할 수 있습니다.

Stream은 에러에 대응하는 방법을 제공합니다.

single subscription 이나 broadcast는 stream의 종류입니다.

 

음... 네. 그렇다고 하네요ㅎㅎ;;

Asynchronous programming in Dart is characterized by the Future and Stream classes.

아하! Dart에서 비동기를 다루는 녀석들은 Futrue이랑 Stream이라고 해요! 그러면 이 둘을 알고 다룰 수 있다면 비동기 처리는 쉽게 처리할 수 있을거 같아요!

 

그렇다면 Future 과(발음 기준으로 했습니다!) Stream의 차이점은 뭘까요?

 

A Future represents a computation that doesn’t complete immediately. Where a normal function returns the result, an asynchronous function returns a Future, which will eventually contain the result. The future will tell you when the result is ready.

A stream is a sequence of asynchronous events. It is like an asynchronous Iterable—where, instead of getting the next event when you ask for it, the stream tells you that there is an event when it is ready.

 

Future은 음.. 쉽게 말해서 나중에 return 되는 결과값이 준비가 완료되었을 때 다음으로 넘어가는 거라면 stream은 비동기적인 일들의 순서이다. 라네요...? 직역을 하면 stream은 비동기적인 iterable이랍니다. iterable은 member를 하나씩 차례로 반환 가능한 object를 말한다.  라고 하네요? <https://bluese05.tistory.com/55>

 

python iterable과 iterator의 의미

Python iterable과 iterator의 의미 Iterable (이터러블) iterable에 대한 python docs의 정의를 보자. Iterable An object capable of returning its members one at a time. Examples of iterables include all sequence types (such as list, str, and tu

bluese05.tistory.com

그리고 우리가 stream에 대해 요청했을 때 다음 이벤트를 얻는 거 대신에 stream은 that 이하를 이야기 해준대요, 준비가 되었을 때 이벤트가 있다는 것을. 흠... 음??? 그게 머징? 아직 와닿지는 않는 거 같아요! 조금 더 살펴볼까요?

Receiving stream events

Streams can be created in many ways, which is a topic for another article, but they can all be used in the same way: the asynchronous for loop (commonly just called await for) iterates over the events of a stream like the for loop iterates over an Iterable. 

네, 제가 이해한 대로 독해를 해보자면 Stream은 다양한 방법으로 만들어질 수 있지만 같은 방식으로 사용될 수 있다고 합니다. await for이라고 불리는 loop 방식의 비동기는 반목문 for loop 같이 값을 차례대로 참조하는 것처럼 stream의 이벤트들을 차례대로 참조할 수 있다고 하는 것 같아요! 예를 볼까요?

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }
  return sum;
}

This code simply receives each event of a stream of integer events, adds them up, and returns (a future of) the sum. When the loop body ends, the function is paused until the next event arrives or the stream is done.

The function is marked with the async keyword, which is required when using the await for loop.

위 코드에 대한 설명입니다 ㅎㅎ

간략하게 말하면 await for 문이 끝날 때까지 대기한다는 말인 거 같아요. 근데 원래 for문은 그러지 않나요? 아닌가??

아직까지 어째서 Stream을 사용하는지 모르겠어요!

 

Error events

Streams are done when there are no more events in them, and the code receiving the events is notified of this just as it is notified that a new event arrives. When reading events using an await for loop, the loops stops when the stream is done.

Streams는 할 일이 없으면 완료가 되요!  그리고 이벤트를 받는 코드는 새 이벤트가 도착한다는 알림을 받는 것처럼 똑같이 알림을 받는다고 해요. 할일을 다하면 코드에서 알림을 받는 다고 해석하면 될 거 같아요!

 

In some cases, an error happens before the stream is done; perhaps the network failed while fetching a file from a

remote server, or perhaps the code creating the events has a bug, but someone needs to know about it.

다양한 경우에서 stream이 완료되기 전에 에러가 발생해요! 근데 누군가는 이 에러에 대해서 알 필요가 있다네요!

 

Streams can also deliver error events like it delivers data events. Most streams will stop after the first error, but it is possible to have streams that deliver more than one error, and streams that deliver more data after an error event. In this document we only discuss streams that deliver at most one error.

Streams는 데이터 이벤트들을 전달하는 것처럼 에러 이벤트를 전달할 수 있다고 해요. 대부분의 streams는 첫번째 에러 후에 멈추는 데 하나의 에러보다 더 많은 에러를 전달할 수 있게 할 수 있고 첫번째 에러 이벤트 후에 더 많은 데이터를 전달할 수 있다고 해요. 이 문서에서는 하나의 에러만 전달하는 streams에 대해서만 설명한다고 해요.

참고로 Streams도 try-catch 문으로 에러를 다룰 수 있어요.

  try {
    await for (final value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

요런식으로 말이죠.

 

Working with streams

The Stream class contains a number of helper methods that can do common operations on a stream for you, similar to the methods on an Iterable. For example, you can find the last positive integer in a stream using lastWhere() from the Stream API.

 

Stream에는 iterable에서 수행되는 함수와 유사한 많은 도움 함수들이 있다고 해요. 그 함수들은  stream에서 일반적인 연산을 수행할 수 있다고 해요. 예를 들면 마지막 양수를 찾아주는 lastWhere() 함수가 있다고 해요.

Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

네, 요렇게 생겼어요.

 

Two kinds of streams

There are two kinds of streams.

2종류의 stream에 대해서 살펴 볼게요!

   Single subscription streams

      The most common kind of stream contains a sequence of events that are parts of a larger whole. Events need to be delivered in the correct order and without missing any of them. This is the kind of stream you get when you read a file or receive a web request.

가장 일반적인 종류의 stream은 더 큰 전체의 부분인 이벤트들의 순서를 포함한다. 이벤트는 옳은 순서대로 전달되야 하며 그들 중 하나도 놓치는 게 없어야 한다. 이 종류의 stream은 우리가 파일을 읽을 때나 웹에서 어떤 요구를 받을 때 얻는 것입니다. 네, 무슨 말일까요, Single subscription stream은 순서가 존재한다. 이렇게 정리할게여,

 

Such a stream can only be listened to once. Listening again later could mean missing out on initial events, and then the rest of the stream makes no sense. When you start listening, the data will be fetched and provided in chunks.

이런 stream은 한번만 한번만 들어야한다. 그러니까 아니 이게 아니라 한번만 들을 수 있다내요, 듣는 다는게 어떤 뜻으로 쓰였는 지 모르겠지만 나중에 한번 더 listening 된다는 건 최초의 이벤트에서 무언가 놓쳤다는 걸 뜻하고 나머지 stream이 말이 안되게 될 수 있다. 라고 하는 거 같아요. 그래서 우리가 listening을 시작할 때 데이터는 fetch되고 불러와지고 덩어리로 제공된다고 해요. 네, 순서가 있는 Single subscription stream에 대한 설명이었습니다.

    Broadcast streams

        The other kind of stream is intended for individual messages that can be handled one at a time. This kind of stream can be used for mouse events in a browser, for example.

2번째 종류의 stream입니다. Broadcast stream은 한번에 하나씩 다뤄질 수 있는 개인적인, 단적인 메시지들을 위해 계획된 녀석이랍니다. 예를 들어서 브라우저에서 사용되는 마우스 이벤트를 위해 사용될 수 있다고 하네요! 예시가 있으니 확실히 이해가 가는 거 같아요.

 

You can start listening to such a stream at any time, and you get the events that are fired while you listen. More than one listener can listen at the same time, and you can listen again later after canceling a previous subscription.

어떤 시간에서도 이런 stream을 listening 할 수 있고(listening은 뭔가 우리가 어떤 값을 입력 후 원하는 값을 얻기까지 기다리는 걸 뜻하는 거 같아요!) 우리가 듣는 동안 발생된 이벤트들을 수신할 수 있다고 해요. 하나보다 더 많은 listener은 같은 시간에 들을 수 있고 우리는 이전에 요청한 값을 취소한 이후에 다시 들을 수 있다고 해요. 그러니까 Broadcast stream을 쓰면 한번에 하나씩 쓸 수 있고 이전에 요청했던 값을 취소한 뒤에 다시 어떤 값을 요청할 수 있다. 라는 뜻 같아요.


그럼 이제 어떻게 사용하는 지 볼까요?

Methods that process a stream

The following methods on Stream<T> process the stream and return a result:

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

Stream을 구현해주는 함수는 위와 같아요. Future을 많이 사용하네요!

또 await for과 함께 사용할 수도 있다고 해요

Future<bool> contains(Object? needle) async {
  await for (final event in this) {
    if (event == needle) return true;
  }
  return false;
}

The asyncExpand() and asyncMap() functions are similar to expand() and map(), but allow their function argument to be an asynchronous function. 

이 두 함수는 인자로 비동기 함수를 전달할 수 있답니다.

Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);

또 아래에 있는 3개의 함수는 오류를 다루는 기능을 제공한다고 해요. 

Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink)? onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

The final three functions are more special. They involve error handling which an await for loop can’t do—the first error reaching the loops will end the loop and its subscription on the stream. There is no recovering from that. The following code shows how to use handleError() to remove errors from a stream before using it in an await for loop.

네 오류를 다루는 걸 길게 길게 설명한 거예요. 아래에 있는 예시를 볼까요?

Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (final event in streamWithoutErrors) {
    yield convert(event);
  }
}

네, var streamWithoutErrors = stream.handleError((e) => log(e));를 통해서 에러를 다루는 부분이 인상적이네요...

근데요

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

이 녀석은 그냥 에러를 다루는 친구가 아니라고 강조하고 있어요.

The transform() function

The transform() function is not just for error handling; it is a more generalized “map” for streams.

map을 일반화한 거라고 해요.

A normal map requires one value for each incoming event. However, especially for I/O streams, it might take several incoming events to produce an output event. A StreamTransformer can work with that.

보통 map은 들어오는 이벤트별로 하나의 값이 필요하다고 해요. 그런데 특히 I/O stream 같은 경우에 하나의 output 이벤트를 만들기 위해서 여러가지 이벤트를 가져야 할 수도 있는데 StreamTransformer은 이런 것들에 대해서 작동할 수 있다고 해요.

For example, decoders like Utf8Decoder are transformers. A transformer requires only one function, bind(), which can be easily implemented by an async function.

예를 들어 Utf8Decoder같은 해독기가 transformer(변압기?)라고 하네요? transformer은 하나의 함수만 필요로 하는데 비동기 함수에 의해 쉽게 구현될 수 있는 bind() 같은 함수래요.

 

Reading and decoding a file

The following code reads a file and runs two transforms over the stream. It first converts the data from UTF8 and then runs it through a LineSplitter. All lines are printed, except any that begin with a hashtag, #.

아래 코드는 파일을 읽고 stream을 통해 두 개의 변환을 실행합니다. 하나는 UTF8에서 데이터를 변환한 다음 LineSplitter를 통해서 이걸 실행한다고 합니다. 모든 라인들은 출력됩니다. 물론 #로 시작하는 친구들을 제외하구요.

import 'dart:convert';
import 'dart:io';

void main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(const LineSplitter());
  await for (final line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

네 bind()가 나왔고 transform() 함수 안에 LineSplitter() 가 있는 걸 확인할 수 있어. Stream은 뭔가 Future이랑 비슷한 개념이지만 순서대로 어떤 값을 가져온다는 부분이 다른 점인 거 같아요.

 

드디어 마지막이네요!

The listen() method

The final method on Stream is listen(). This is a “low-level” method—all other stream functions are defined in terms of listen().

listen() 함수는 낮은 단계의 함수인데 모든 다른 stream 함수는 listen()에 따라서 정의된다고 해요.

StreamSubscription<T> listen(void Function(T event)? onData,
    {Function? onError, void Function()? onDone, bool? cancelOnError});

To create a new Stream type, you can just extend the Stream class and implement the listen() method—all other methods on Stream call listen() in order to work.

새로운 Stream 타입을 만들기 위해서는 Stream class를 확장시키고 listen() 함수를 구현하면 된다고해요. listen()함수는 꽤 중요한 거 같아요.

The listen() method allows you to start listening on a stream. Until you do so, the stream is an inert object describing what events you want to see.

listen() 함수는 우리가 stream을 들을 수 있도록 시작하는 걸 허락하고 이전까지는 stream은 불활성화 된다고 해요.

 

 When you listen, a StreamSubscription object is returned which represents the active stream producing events. This is similar to how an Iterable is just a collection of objects, but the iterator is the one doing the actual iteration.

우리가 들을 때, StreamSubscription 객체는 return 되요. 그리고 이 객체는 이벤트를 생산하는 활동하는 stream을 나타낸다고 해요. 이건 Iterable이 어떻게 객체들의 집합체가 되는지와 유사하지만 그 iterator는 실제 iteration을 하는 것이라고 해요. 아니 독해가 좀 이상하네요.

우리가 리스닝 할 때 이벤트를 생성하는 활성 스트림을 나타내는 StreamSubscription 객체가 반환되요. 이건 iterator가 실제 반복을 수행하는 것과 유사하다고 해요. 네, 그니까 우리가 어떤 요청한 뒤 반환 받는 녀석에 StreamSubscription 객체인데 이런 것들이 반복자 iterator가 반복을 수행하는 것과 유사하다고 하는 거 같아요.

 

The stream subscription allows you to pause the subscription, resume it after a pause, and cancel it completely. You can set callbacks to be called for each data event or error event, and when the stream is closed.

네, StreamSubscription 으로 멈추고 다시 시작할 수도 있고 완전히 취소할 수 있다고 해요. 또 이걸로 stream이 종료 됐을 때 callback을 정할 수 있다고 해요, 각 데이터 이벤트나 에러 이벤트에 대해 호출될 수 있는 callback들이요,

 

네 이번 시간에는 Stream에 대해서 알아봤어요.

사실 Stream에 대한 전반적인 내용을 알아봤는데 중요한 게 이론에서만 끝나면 안되잖아요 그죠?

그래서 다음에는 Stream을 어떻게 사용하는 지에 대해서 포스팅하는 시간을 가질게요! https://dart.dev/articles/libraries/creating-streams

 

Creating streams in Dart

A stream is a sequence of results; learn how to create your own.

dart.dev

 

2023/03/23 

네, 제가 Stream을 어떻게 사용하는 지에 대해서 포스팅을 한다고 했네요!

무기한으로 연기하겠습니다!

Stream보다 Future를 훨씬 더 많이 사용해서 우선순위를 뒤로 미뤄도 괜찮을 거라고 판단했기 때문이에요!

네! 이상입니다!

댓글