Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(346)

Unified Diff: lib/src/multi_channel.dart

Issue 1686263002: Make MultiChannel follow the stream channel rules. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/multi_channel.dart
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
index cba592ab563fc9a9dc89fc78b5cb957f83fbdb13..112997e57e5cfd34f4487a2a79a942812375601b 100644
--- a/lib/src/multi_channel.dart
+++ b/lib/src/multi_channel.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:async/async.dart';
+
import '../stream_channel.dart';
/// A class that multiplexes multiple virtual channels across a single
@@ -85,21 +87,15 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
/// The subscription to [_inner.stream].
StreamSubscription _innerStreamSubscription;
- Stream get stream => _streamController.stream;
- final _streamController = new StreamController(sync: true);
-
- StreamSink get sink => _sinkController.sink;
- final _sinkController = new StreamController(sync: true);
+ Stream get stream => _mainController.foreign.stream;
+ StreamSink get sink => _mainController.foreign.sink;
- /// A map from virtual channel ids to [StreamController]s that should be used
- /// to write messages received from those channels.
- final _streamControllers = new Map<int, StreamController>();
+ /// The controller for this channel.
+ final _mainController = new StreamChannelController(sync: true);
- /// A map from virtual channel ids to [StreamControllers]s that are used
- /// to receive messages to write to those channels.
- ///
- /// Note that this uses the same keys as [_streamControllers].
- final _sinkControllers = new Map<int, StreamController>();
+ /// A map from virtual channel ids to [StreamChannelController]s that should
+ /// be used to communicate over those channels.
+ final _controllers = <int, StreamChannelController>{};
/// The next id to use for a local virtual channel.
///
@@ -125,35 +121,34 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
_MultiChannel(this._inner) {
// The default connection is a special case which has id 0 on both ends.
// This allows it to begin connected without having to send over an id.
- _streamControllers[0] = _streamController;
- _sinkControllers[0] = _sinkController;
- _sinkController.stream.listen(
+ _controllers[0] = _mainController;
+ _mainController.local.stream.listen(
(message) => _inner.sink.add([0, message]),
onDone: () => _closeChannel(0, 0));
_innerStreamSubscription = _inner.stream.listen((message) {
var id = message[0];
- var sink = _streamControllers[id];
+ var controller = _controllers[id];
- // A sink might not exist if the channel was closed before an incoming
- // message was processed.
- if (sink == null) return;
+ // A controller might not exist if the channel was closed before an
+ // incoming message was processed.
+ if (controller == null) return;
if (message.length > 1) {
- sink.add(message[1]);
+ controller.local.sink.add(message[1]);
return;
}
- // A message without data indicates that the channel has been closed.
- _sinkControllers[id].close();
- }, onDone: _closeInnerChannel,
- onError: _streamController.addError);
+ // A message without data indicates that the channel has been closed. We
+ // can only close the sink here without doing any more cleanup, because
+ // the sink closing will cause the stream to emit a done event which will
+ // trigger more cleanup.
+ controller.local.sink.close();
+ },
+ onDone: _closeInnerChannel,
+ onError: _mainController.local.sink.addError);
}
VirtualChannel virtualChannel([id]) {
- if (_inner == null) {
- throw new StateError("The underlying channel is closed.");
- }
-
var inputId;
var outputId;
if (id != null) {
@@ -171,34 +166,39 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
_nextId += 2;
}
- if (_streamControllers.containsKey(inputId)) {
+ // If the inner channel has already closed, create new virtual channels in a
+ // closed state.
+ if (_inner == null) {
+ return new VirtualChannel._(
+ this, inputId, new Stream.empty(), new NullStreamSink());
+ }
+
+ if (_controllers.containsKey(inputId)) {
throw new ArgumentError("A virtual channel with id $id already exists.");
}
- var streamController = new StreamController(sync: true);
- var sinkController = new StreamController(sync: true);
- _streamControllers[inputId] = streamController;
- _sinkControllers[inputId] = sinkController;
- sinkController.stream.listen(
+ var controller = new StreamChannelController(sync: true);
+ _controllers[inputId] = controller;
+ controller.local.stream.listen(
(message) => _inner.sink.add([outputId, message]),
onDone: () => _closeChannel(inputId, outputId));
return new VirtualChannel._(
- this, outputId, streamController.stream, sinkController.sink);
+ this, outputId, controller.foreign.stream, controller.foreign.sink);
}
/// Closes the virtual channel for which incoming messages have [inputId] and
/// outgoing messages have [outputId].
void _closeChannel(int inputId, int outputId) {
- _streamControllers.remove(inputId).close();
- _sinkControllers.remove(inputId).close();
+ var controller = _controllers.remove(inputId);
+ controller.local.sink.close();
if (_inner == null) return;
// A message without data indicates that the virtual channel has been
// closed.
_inner.sink.add([outputId]);
- if (_streamControllers.isEmpty) _closeInnerChannel();
+ if (_controllers.isEmpty) _closeInnerChannel();
}
/// Closes the underlying communication channel.
@@ -206,9 +206,13 @@ class _MultiChannel extends StreamChannelMixin implements MultiChannel {
_inner.sink.close();
_innerStreamSubscription.cancel();
_inner = null;
- for (var controller in _sinkControllers.values.toList()) {
- controller.close();
+
+ // Convert this to a list because the close is dispatched synchronously, and
+ // that could conceivably remove a controller from [_controllers].
+ for (var controller in new List.from(_controllers.values)) {
+ controller.local.sink.close();
}
+ _controllers.clear();
}
}
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698