Chromium Code Reviews| Index: lib/src/multi_channel.dart |
| diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart |
| index cba592ab563fc9a9dc89fc78b5cb957f83fbdb13..9999b185b7be316a4bfb68cb02a75ecbdc6a52f7 100644 |
| --- a/lib/src/multi_channel.dart |
| +++ b/lib/src/multi_channel.dart |
| @@ -4,6 +4,8 @@ |
| import 'dart:async'; |
| +import 'package:async/async.dart'; |
| + |
| import '../stream_channel.dart'; |
| /// A class that multiplexes multiple virtual channels across a single |
| @@ -85,21 +87,15 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
| /// The subscription to [_inner.stream]. |
| StreamSubscription _innerStreamSubscription; |
| - Stream get stream => _streamController.stream; |
| - final _streamController = new StreamController(sync: true); |
| - |
| - StreamSink get sink => _sinkController.sink; |
| - final _sinkController = new StreamController(sync: true); |
| + Stream get stream => _mainController.foreign.stream; |
| + StreamSink get sink => _mainController.foreign.sink; |
| - /// A map from virtual channel ids to [StreamController]s that should be used |
| - /// to write messages received from those channels. |
| - final _streamControllers = new Map<int, StreamController>(); |
| + /// The controller for this channel. |
| + final _mainController = new StreamChannelController(sync: true); |
| - /// A map from virtual channel ids to [StreamControllers]s that are used |
| - /// to receive messages to write to those channels. |
| - /// |
| - /// Note that this uses the same keys as [_streamControllers]. |
| - final _sinkControllers = new Map<int, StreamController>(); |
| + /// A map from virtual channel ids to [StreamChannelController]s that should |
| + /// be used to communicate over those channels. |
| + final _controllers = <int, StreamChannelController>{}; |
| /// The next id to use for a local virtual channel. |
| /// |
| @@ -125,33 +121,44 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
| _MultiChannel(this._inner) { |
| // The default connection is a special case which has id 0 on both ends. |
| // This allows it to begin connected without having to send over an id. |
| - _streamControllers[0] = _streamController; |
| - _sinkControllers[0] = _sinkController; |
| - _sinkController.stream.listen( |
| + _controllers[0] = _mainController; |
| + _mainController.local.stream.listen( |
| (message) => _inner.sink.add([0, message]), |
| onDone: () => _closeChannel(0, 0)); |
| _innerStreamSubscription = _inner.stream.listen((message) { |
| var id = message[0]; |
| - var sink = _streamControllers[id]; |
| + var controller = _controllers[id]; |
| - // A sink might not exist if the channel was closed before an incoming |
| - // message was processed. |
| - if (sink == null) return; |
| + // A controller might not exist if the channel was closed before an |
| + // incoming message was processed. |
| + if (controller == null) return; |
| if (message.length > 1) { |
| - sink.add(message[1]); |
| + controller.local.sink.add(message[1]); |
| return; |
| } |
| - // A message without data indicates that the channel has been closed. |
| - _sinkControllers[id].close(); |
| - }, onDone: _closeInnerChannel, |
| - onError: _streamController.addError); |
| + // A message without data indicates that the channel has been closed. We |
| + // can only close the sink here without doing any more cleanup, because |
| + // the sink closing will cause the stream to emit a done event which will |
| + // trigger more cleanup. |
| + controller.local.sink.close(); |
| + }, |
| + onDone: _closeInnerChannel, |
|
tjblasi
2016/02/10 22:10:10
This looks like odd indentation to me, but if dart
nweiz
2016/02/10 22:26:13
I don't generally use dartfmt, but it does happen
|
| + onError: _mainController.local.sink.addError); |
| } |
| VirtualChannel virtualChannel([id]) { |
| + // If the inner channel has already closed, create new virtual channels in a |
| + // closed state. |
| if (_inner == null) { |
| - throw new StateError("The underlying channel is closed."); |
| + if (id == null) { |
| + id = _nextId + 1; |
|
tjblasi
2016/02/10 22:10:10
This is well explained below & in the comments abo
nweiz
2016/02/10 22:26:13
The trouble is, most of the time we need to return
|
| + _nextId += 2; |
| + } |
| + |
| + return new VirtualChannel._( |
| + this, id, new Stream.empty(), new NullStreamSink()); |
| } |
| var inputId; |
| @@ -171,34 +178,32 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
| _nextId += 2; |
| } |
| - if (_streamControllers.containsKey(inputId)) { |
| + if (_controllers.containsKey(inputId)) { |
| throw new ArgumentError("A virtual channel with id $id already exists."); |
| } |
| - var streamController = new StreamController(sync: true); |
| - var sinkController = new StreamController(sync: true); |
| - _streamControllers[inputId] = streamController; |
| - _sinkControllers[inputId] = sinkController; |
| - sinkController.stream.listen( |
| + var controller = new StreamChannelController(sync: true); |
| + _controllers[inputId] = controller; |
| + controller.local.stream.listen( |
| (message) => _inner.sink.add([outputId, message]), |
| onDone: () => _closeChannel(inputId, outputId)); |
| return new VirtualChannel._( |
| - this, outputId, streamController.stream, sinkController.sink); |
| + this, outputId, controller.foreign.stream, controller.foreign.sink); |
| } |
| /// Closes the virtual channel for which incoming messages have [inputId] and |
| /// outgoing messages have [outputId]. |
| void _closeChannel(int inputId, int outputId) { |
| - _streamControllers.remove(inputId).close(); |
| - _sinkControllers.remove(inputId).close(); |
| + var controller = _controllers.remove(inputId); |
| + controller.local.sink.close(); |
| if (_inner == null) return; |
| // A message without data indicates that the virtual channel has been |
| // closed. |
| _inner.sink.add([outputId]); |
| - if (_streamControllers.isEmpty) _closeInnerChannel(); |
| + if (_controllers.isEmpty) _closeInnerChannel(); |
| } |
| /// Closes the underlying communication channel. |
| @@ -206,9 +211,13 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
| _inner.sink.close(); |
| _innerStreamSubscription.cancel(); |
| _inner = null; |
| - for (var controller in _sinkControllers.values.toList()) { |
| - controller.close(); |
| + |
| + // Convert this to a list because the close is dispatched synchronously, and |
| + // that could conceivably remove a controller from [_controllers]. |
| + for (var controller in _controllers.values.toList()) { |
|
tjblasi
2016/02/10 22:10:10
nit: I think `new List.from(_controller.values)` w
nweiz
2016/02/10 22:26:12
Done, although I'm curious what about this is more
|
| + controller.local.sink.close(); |
| } |
| + _controllers.clear(); |
| } |
| } |