Asynchronous programming in Dart - Streams (Part II)

Sujay Prabhu's avatar

Sujay Prabhu

In the realm of asynchronous programming, Streams stand out as a pivotal concept. Stream is a sequence of asynchronous events. Unlike a Future, which concludes with a single outcome, Streams are designed to emit multiple values dynamically over time. This blog sets out to demystify Streams, showcasing their various types, creation methods, and best practices, offering valuable insights of asynchronous programming in Dart.

There are two types of streams:

  1. Single subscription stream
  2. Broadcast stream

Single subscription stream: A single subscription stream can only have one listener. If there are multiple listeners, then it will throw an error.

void main() {
  final stream = Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(5);
 
  final subscriber = stream.listen((event) {
    print("Single subscription stream: $event");
  }, onError: (err) {
    print(err);
  }, onDone: () {
    print('Done');
  });
}

Broadcast stream: A broadcast stream can have multiple listeners. It is similar to a radio station. Multiple people can listen to the same radio station at the same time.

How do we create a stream?

  1. Creating from scratch using generators (async*, yield)
  2. Creating using StreamController
  3. Transforming existing streams

Creating from scratch using generators (async*, yield)

Stream can be created using asynchronous generators (async*) function. This stream is created when the function is called and starts execting when the stream is listened to.

Stream<int> countStream(int max) async* {
  for (int i = 0; i < max; i++) {
    yield i;
    await Future.delayed(Duration(seconds: 1));
  }
}
 
void main() async {
  countStream(5).listen((event) {
    print(event);
  });
}
  • The countStream() function returns a Stream<int>. The stream will emit integers.
  • The async* keyword indicates that the function is a generator. The function will return a Stream<int>.
  • The yield keyword is used to emit a value. In this case, it is an integer.
  • The await keyword is used to wait for the Future to complete. In this case, it is Future.delayed().
  • The Future.delayed() method returns a Future that completes after the specified duration.
  • The listen() method is used to listen to the stream. It takes a callback function as an argument.
  • The callback function is called when the stream emits a value.
  • The callback function prints the value emitted by the stream.

Creating using StreamController

If the events of your stream comes from different parts of your program, and not just from a stream or futures that can traversed by an async function, then use a StreamController to create and populate the stream. It gives more control over the stream. It allows you to add events to the stream at any time. It is similar to a sink. You can add events to the stream using the sink. You can also listen to the stream using the stream. Lets see how we can create a stream using StreamController.

void main() async {
  countStream();
}
 
Stream<int> countStream() {
  final controller = StreamController<int>();
 
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
  controller.sink.add(4);
  controller.sink.add(5);
 
 
  final subscription = controller.stream.listen((event) {
    print(event);
  },
  onError: (err) {
    print(err);
  },
  onDone: () {
    print('Done');
  });
 
  controller.sink.add(6);
  controller.sink.addError('Error');
  controller.sink.close();
  subscription.cancel();
}
  • Integers from 1 to 5 are added to the stream via controller.sink.add(). This means these numbers will be emitted by the stream.
  • A subscription to the stream is created using controller.stream.listen(). This subscription will handle three types of events:
    • Data Event: When a new data (integer) is emitted
    • Error Event: If there's an error in the stream
    • Done Event: When the stream is closed
  • An error is added to the stream using controller.sink.addError('Error').
  • Finally the stream is closed with close and the subscription is cancelled with cancel.

Few things to take care of when using StreamController:

  • Wait for subscrition: Streams should wait for subscribers before producing events. This is done automatically with async* functions. As StreamController gives full control, it is possible to add events before a subscriber is registered. This can be done by using onListen callback.

  • Honor the pause state: Streams should not produce events while the listener is paused. In async* functions, this is done with the yield keyword. StreamController buffers events while the listener is paused. If our code doesn't honor the pause state, then the buffer will grow indefinitely.

Transforming existing streams

Common use case of creating a stream is to transform an existing stream using methods like map(), where() etc Lets see how we can do that.

void main() async {
  final doubleStream = countStream(5).map((event) => event * 2);
  doubleStream.forEach(print);
}
  • The map() method is used to transform the stream. It takes a callback function as an argument.
  • The callback function is called when the stream emits a value.
  • The callback function multiplies the value emitted by the stream by 2 and creates a new stream.

Most of the time, you will be using the Stream class to create streams. But there are other classes that implement the Stream interface like StreamTransformer with transform() method.

void main() async {
  final doubleStream = countStream(5).transform(StreamTransformer.fromHandlers(
    handleData: (data, sink) {
      sink.add(data * 2);
    }
  ));
  doubleStream.forEach(print);
}

Error handling in streams:

Streams can emit errors. We can handle errors using the onError() method.

void main() async {
  final doubleStream = countStream(5).map((event) => event * 2);
  doubleStream.listen(print, onError: (err) {
    print(err);
  });
}

References