| Index: lib/src/multi_channel.dart
|
| diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
|
| index cba592ab563fc9a9dc89fc78b5cb957f83fbdb13..112997e57e5cfd34f4487a2a79a942812375601b 100644
|
| --- a/lib/src/multi_channel.dart
|
| +++ b/lib/src/multi_channel.dart
|
| @@ -4,6 +4,8 @@
|
|
|
| import 'dart:async';
|
|
|
| +import 'package:async/async.dart';
|
| +
|
| import '../stream_channel.dart';
|
|
|
| /// A class that multiplexes multiple virtual channels across a single
|
| @@ -85,21 +87,15 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
|
| /// The subscription to [_inner.stream].
|
| StreamSubscription _innerStreamSubscription;
|
|
|
| - Stream get stream => _streamController.stream;
|
| - final _streamController = new StreamController(sync: true);
|
| -
|
| - StreamSink get sink => _sinkController.sink;
|
| - final _sinkController = new StreamController(sync: true);
|
| + Stream get stream => _mainController.foreign.stream;
|
| + StreamSink get sink => _mainController.foreign.sink;
|
|
|
| - /// A map from virtual channel ids to [StreamController]s that should be used
|
| - /// to write messages received from those channels.
|
| - final _streamControllers = new Map<int, StreamController>();
|
| + /// The controller for this channel.
|
| + final _mainController = new StreamChannelController(sync: true);
|
|
|
| - /// A map from virtual channel ids to [StreamControllers]s that are used
|
| - /// to receive messages to write to those channels.
|
| - ///
|
| - /// Note that this uses the same keys as [_streamControllers].
|
| - final _sinkControllers = new Map<int, StreamController>();
|
| + /// A map from virtual channel ids to [StreamChannelController]s that should
|
| + /// be used to communicate over those channels.
|
| + final _controllers = <int, StreamChannelController>{};
|
|
|
| /// The next id to use for a local virtual channel.
|
| ///
|
| @@ -125,35 +121,34 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
|
| _MultiChannel(this._inner) {
|
| // The default connection is a special case which has id 0 on both ends.
|
| // This allows it to begin connected without having to send over an id.
|
| - _streamControllers[0] = _streamController;
|
| - _sinkControllers[0] = _sinkController;
|
| - _sinkController.stream.listen(
|
| + _controllers[0] = _mainController;
|
| + _mainController.local.stream.listen(
|
| (message) => _inner.sink.add([0, message]),
|
| onDone: () => _closeChannel(0, 0));
|
|
|
| _innerStreamSubscription = _inner.stream.listen((message) {
|
| var id = message[0];
|
| - var sink = _streamControllers[id];
|
| + var controller = _controllers[id];
|
|
|
| - // A sink might not exist if the channel was closed before an incoming
|
| - // message was processed.
|
| - if (sink == null) return;
|
| + // A controller might not exist if the channel was closed before an
|
| + // incoming message was processed.
|
| + if (controller == null) return;
|
| if (message.length > 1) {
|
| - sink.add(message[1]);
|
| + controller.local.sink.add(message[1]);
|
| return;
|
| }
|
|
|
| - // A message without data indicates that the channel has been closed.
|
| - _sinkControllers[id].close();
|
| - }, onDone: _closeInnerChannel,
|
| - onError: _streamController.addError);
|
| + // A message without data indicates that the channel has been closed. We
|
| + // can only close the sink here without doing any more cleanup, because
|
| + // the sink closing will cause the stream to emit a done event which will
|
| + // trigger more cleanup.
|
| + controller.local.sink.close();
|
| + },
|
| + onDone: _closeInnerChannel,
|
| + onError: _mainController.local.sink.addError);
|
| }
|
|
|
| VirtualChannel virtualChannel([id]) {
|
| - if (_inner == null) {
|
| - throw new StateError("The underlying channel is closed.");
|
| - }
|
| -
|
| var inputId;
|
| var outputId;
|
| if (id != null) {
|
| @@ -171,34 +166,39 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
|
| _nextId += 2;
|
| }
|
|
|
| - if (_streamControllers.containsKey(inputId)) {
|
| + // If the inner channel has already closed, create new virtual channels in a
|
| + // closed state.
|
| + if (_inner == null) {
|
| + return new VirtualChannel._(
|
| + this, inputId, new Stream.empty(), new NullStreamSink());
|
| + }
|
| +
|
| + if (_controllers.containsKey(inputId)) {
|
| throw new ArgumentError("A virtual channel with id $id already exists.");
|
| }
|
|
|
| - var streamController = new StreamController(sync: true);
|
| - var sinkController = new StreamController(sync: true);
|
| - _streamControllers[inputId] = streamController;
|
| - _sinkControllers[inputId] = sinkController;
|
| - sinkController.stream.listen(
|
| + var controller = new StreamChannelController(sync: true);
|
| + _controllers[inputId] = controller;
|
| + controller.local.stream.listen(
|
| (message) => _inner.sink.add([outputId, message]),
|
| onDone: () => _closeChannel(inputId, outputId));
|
|
|
| return new VirtualChannel._(
|
| - this, outputId, streamController.stream, sinkController.sink);
|
| + this, outputId, controller.foreign.stream, controller.foreign.sink);
|
| }
|
|
|
| /// Closes the virtual channel for which incoming messages have [inputId] and
|
| /// outgoing messages have [outputId].
|
| void _closeChannel(int inputId, int outputId) {
|
| - _streamControllers.remove(inputId).close();
|
| - _sinkControllers.remove(inputId).close();
|
| + var controller = _controllers.remove(inputId);
|
| + controller.local.sink.close();
|
|
|
| if (_inner == null) return;
|
|
|
| // A message without data indicates that the virtual channel has been
|
| // closed.
|
| _inner.sink.add([outputId]);
|
| - if (_streamControllers.isEmpty) _closeInnerChannel();
|
| + if (_controllers.isEmpty) _closeInnerChannel();
|
| }
|
|
|
| /// Closes the underlying communication channel.
|
| @@ -206,9 +206,13 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
|
| _inner.sink.close();
|
| _innerStreamSubscription.cancel();
|
| _inner = null;
|
| - for (var controller in _sinkControllers.values.toList()) {
|
| - controller.close();
|
| +
|
| + // Convert this to a list because the close is dispatched synchronously, and
|
| + // that could conceivably remove a controller from [_controllers].
|
| + for (var controller in new List.from(_controllers.values)) {
|
| + controller.local.sink.close();
|
| }
|
| + _controllers.clear();
|
| }
|
| }
|
|
|
|
|