Index: lib/src/multi_channel.dart |
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart |
index cba592ab563fc9a9dc89fc78b5cb957f83fbdb13..112997e57e5cfd34f4487a2a79a942812375601b 100644 |
--- a/lib/src/multi_channel.dart |
+++ b/lib/src/multi_channel.dart |
@@ -4,6 +4,8 @@ |
import 'dart:async'; |
+import 'package:async/async.dart'; |
+ |
import '../stream_channel.dart'; |
/// A class that multiplexes multiple virtual channels across a single |
@@ -85,21 +87,15 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
/// The subscription to [_inner.stream]. |
StreamSubscription _innerStreamSubscription; |
- Stream get stream => _streamController.stream; |
- final _streamController = new StreamController(sync: true); |
- |
- StreamSink get sink => _sinkController.sink; |
- final _sinkController = new StreamController(sync: true); |
+ Stream get stream => _mainController.foreign.stream; |
+ StreamSink get sink => _mainController.foreign.sink; |
- /// A map from virtual channel ids to [StreamController]s that should be used |
- /// to write messages received from those channels. |
- final _streamControllers = new Map<int, StreamController>(); |
+ /// The controller for this channel. |
+ final _mainController = new StreamChannelController(sync: true); |
- /// A map from virtual channel ids to [StreamControllers]s that are used |
- /// to receive messages to write to those channels. |
- /// |
- /// Note that this uses the same keys as [_streamControllers]. |
- final _sinkControllers = new Map<int, StreamController>(); |
+ /// A map from virtual channel ids to [StreamChannelController]s that should |
+ /// be used to communicate over those channels. |
+ final _controllers = <int, StreamChannelController>{}; |
/// The next id to use for a local virtual channel. |
/// |
@@ -125,35 +121,34 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
_MultiChannel(this._inner) { |
// The default connection is a special case which has id 0 on both ends. |
// This allows it to begin connected without having to send over an id. |
- _streamControllers[0] = _streamController; |
- _sinkControllers[0] = _sinkController; |
- _sinkController.stream.listen( |
+ _controllers[0] = _mainController; |
+ _mainController.local.stream.listen( |
(message) => _inner.sink.add([0, message]), |
onDone: () => _closeChannel(0, 0)); |
_innerStreamSubscription = _inner.stream.listen((message) { |
var id = message[0]; |
- var sink = _streamControllers[id]; |
+ var controller = _controllers[id]; |
- // A sink might not exist if the channel was closed before an incoming |
- // message was processed. |
- if (sink == null) return; |
+ // A controller might not exist if the channel was closed before an |
+ // incoming message was processed. |
+ if (controller == null) return; |
if (message.length > 1) { |
- sink.add(message[1]); |
+ controller.local.sink.add(message[1]); |
return; |
} |
- // A message without data indicates that the channel has been closed. |
- _sinkControllers[id].close(); |
- }, onDone: _closeInnerChannel, |
- onError: _streamController.addError); |
+ // A message without data indicates that the channel has been closed. We |
+ // can only close the sink here without doing any more cleanup, because |
+ // the sink closing will cause the stream to emit a done event which will |
+ // trigger more cleanup. |
+ controller.local.sink.close(); |
+ }, |
+ onDone: _closeInnerChannel, |
+ onError: _mainController.local.sink.addError); |
} |
VirtualChannel virtualChannel([id]) { |
- if (_inner == null) { |
- throw new StateError("The underlying channel is closed."); |
- } |
- |
var inputId; |
var outputId; |
if (id != null) { |
@@ -171,34 +166,39 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
_nextId += 2; |
} |
- if (_streamControllers.containsKey(inputId)) { |
+ // If the inner channel has already closed, create new virtual channels in a |
+ // closed state. |
+ if (_inner == null) { |
+ return new VirtualChannel._( |
+ this, inputId, new Stream.empty(), new NullStreamSink()); |
+ } |
+ |
+ if (_controllers.containsKey(inputId)) { |
throw new ArgumentError("A virtual channel with id $id already exists."); |
} |
- var streamController = new StreamController(sync: true); |
- var sinkController = new StreamController(sync: true); |
- _streamControllers[inputId] = streamController; |
- _sinkControllers[inputId] = sinkController; |
- sinkController.stream.listen( |
+ var controller = new StreamChannelController(sync: true); |
+ _controllers[inputId] = controller; |
+ controller.local.stream.listen( |
(message) => _inner.sink.add([outputId, message]), |
onDone: () => _closeChannel(inputId, outputId)); |
return new VirtualChannel._( |
- this, outputId, streamController.stream, sinkController.sink); |
+ this, outputId, controller.foreign.stream, controller.foreign.sink); |
} |
/// Closes the virtual channel for which incoming messages have [inputId] and |
/// outgoing messages have [outputId]. |
void _closeChannel(int inputId, int outputId) { |
- _streamControllers.remove(inputId).close(); |
- _sinkControllers.remove(inputId).close(); |
+ var controller = _controllers.remove(inputId); |
+ controller.local.sink.close(); |
if (_inner == null) return; |
// A message without data indicates that the virtual channel has been |
// closed. |
_inner.sink.add([outputId]); |
- if (_streamControllers.isEmpty) _closeInnerChannel(); |
+ if (_controllers.isEmpty) _closeInnerChannel(); |
} |
/// Closes the underlying communication channel. |
@@ -206,9 +206,13 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
_inner.sink.close(); |
_innerStreamSubscription.cancel(); |
_inner = null; |
- for (var controller in _sinkControllers.values.toList()) { |
- controller.close(); |
+ |
+ // Convert this to a list because the close is dispatched synchronously, and |
+ // that could conceivably remove a controller from [_controllers]. |
+ for (var controller in new List.from(_controllers.values)) { |
+ controller.local.sink.close(); |
} |
+ _controllers.clear(); |
} |
} |