Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(543)

Side by Side Diff: lib/src/multi_channel.dart

Issue 1686263002: Make MultiChannel follow the stream channel rules. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import 'package:async/async.dart';
8
7 import '../stream_channel.dart'; 9 import '../stream_channel.dart';
8 10
9 /// A class that multiplexes multiple virtual channels across a single 11 /// A class that multiplexes multiple virtual channels across a single
10 /// underlying transport layer. 12 /// underlying transport layer.
11 /// 13 ///
12 /// This should be connected to another [MultiChannel] on the other end of the 14 /// This should be connected to another [MultiChannel] on the other end of the
13 /// underlying channel. It starts with a single default virtual channel, 15 /// underlying channel. It starts with a single default virtual channel,
14 /// accessible via [stream] and [sink]. Additional virtual channels can be 16 /// accessible via [stream] and [sink]. Additional virtual channels can be
15 /// created with [virtualChannel]. 17 /// created with [virtualChannel].
16 /// 18 ///
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
78 /// without having to implement all the private members. 80 /// without having to implement all the private members.
79 class _MultiChannel extends StreamChannelMixin implements MultiChannel { 81 class _MultiChannel extends StreamChannelMixin implements MultiChannel {
80 /// The inner channel over which all communication is conducted. 82 /// The inner channel over which all communication is conducted.
81 /// 83 ///
82 /// This will be `null` if the underlying communication channel is closed. 84 /// This will be `null` if the underlying communication channel is closed.
83 StreamChannel _inner; 85 StreamChannel _inner;
84 86
85 /// The subscription to [_inner.stream]. 87 /// The subscription to [_inner.stream].
86 StreamSubscription _innerStreamSubscription; 88 StreamSubscription _innerStreamSubscription;
87 89
88 Stream get stream => _streamController.stream; 90 Stream get stream => _mainController.foreign.stream;
89 final _streamController = new StreamController(sync: true); 91 StreamSink get sink => _mainController.foreign.sink;
90 92
91 StreamSink get sink => _sinkController.sink; 93 /// The controller for this channel.
92 final _sinkController = new StreamController(sync: true); 94 final _mainController = new StreamChannelController(sync: true);
93 95
94 /// A map from virtual channel ids to [StreamController]s that should be used 96 /// A map from virtual channel ids to [StreamChannelController]s that should
95 /// to write messages received from those channels. 97 /// be used to communicate over those channels.
96 final _streamControllers = new Map<int, StreamController>(); 98 final _controllers = <int, StreamChannelController>{};
97
98 /// A map from virtual channel ids to [StreamControllers]s that are used
99 /// to receive messages to write to those channels.
100 ///
101 /// Note that this uses the same keys as [_streamControllers].
102 final _sinkControllers = new Map<int, StreamController>();
103 99
104 /// The next id to use for a local virtual channel. 100 /// The next id to use for a local virtual channel.
105 /// 101 ///
106 /// Ids are used to identify virtual channels. Each message is tagged with an 102 /// Ids are used to identify virtual channels. Each message is tagged with an
107 /// id; the receiving [MultiChannel] uses this id to look up which 103 /// id; the receiving [MultiChannel] uses this id to look up which
108 /// [VirtualChannel] the message should be dispatched to. 104 /// [VirtualChannel] the message should be dispatched to.
109 /// 105 ///
110 /// The id scheme for virtual channels is somewhat complicated. This is 106 /// The id scheme for virtual channels is somewhat complicated. This is
111 /// necessary to ensure that there are no conflicts even when both endpoints 107 /// necessary to ensure that there are no conflicts even when both endpoints
112 /// have virtual channels with the same id; since both endpoints can send and 108 /// have virtual channels with the same id; since both endpoints can send and
113 /// receive messages across each virtual channel, a naïve scheme would make it 109 /// receive messages across each virtual channel, a naïve scheme would make it
114 /// impossible to tell whether a message was from a channel that originated in 110 /// impossible to tell whether a message was from a channel that originated in
115 /// the remote endpoint or a reply on a channel that originated in the local 111 /// the remote endpoint or a reply on a channel that originated in the local
116 /// endpoint. 112 /// endpoint.
117 /// 113 ///
118 /// The trick is that each endpoint only uses odd ids for its own channels. 114 /// The trick is that each endpoint only uses odd ids for its own channels.
119 /// When sending a message over a channel that was created by the remote 115 /// When sending a message over a channel that was created by the remote
120 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] 116 /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
121 /// knows that if an incoming message has an odd id, it's using the local id 117 /// knows that if an incoming message has an odd id, it's using the local id
122 /// scheme, but if it has an even id, it's using the remote id scheme. 118 /// scheme, but if it has an even id, it's using the remote id scheme.
123 var _nextId = 1; 119 var _nextId = 1;
124 120
125 _MultiChannel(this._inner) { 121 _MultiChannel(this._inner) {
126 // The default connection is a special case which has id 0 on both ends. 122 // The default connection is a special case which has id 0 on both ends.
127 // This allows it to begin connected without having to send over an id. 123 // This allows it to begin connected without having to send over an id.
128 _streamControllers[0] = _streamController; 124 _controllers[0] = _mainController;
129 _sinkControllers[0] = _sinkController; 125 _mainController.local.stream.listen(
130 _sinkController.stream.listen(
131 (message) => _inner.sink.add([0, message]), 126 (message) => _inner.sink.add([0, message]),
132 onDone: () => _closeChannel(0, 0)); 127 onDone: () => _closeChannel(0, 0));
133 128
134 _innerStreamSubscription = _inner.stream.listen((message) { 129 _innerStreamSubscription = _inner.stream.listen((message) {
135 var id = message[0]; 130 var id = message[0];
136 var sink = _streamControllers[id]; 131 var controller = _controllers[id];
137 132
138 // A sink might not exist if the channel was closed before an incoming 133 // A controller might not exist if the channel was closed before an
139 // message was processed. 134 // incoming message was processed.
140 if (sink == null) return; 135 if (controller == null) return;
141 if (message.length > 1) { 136 if (message.length > 1) {
142 sink.add(message[1]); 137 controller.local.sink.add(message[1]);
143 return; 138 return;
144 } 139 }
145 140
146 // A message without data indicates that the channel has been closed. 141 // A message without data indicates that the channel has been closed. We
147 _sinkControllers[id].close(); 142 // can only close the sink here without doing any more cleanup, because
148 }, onDone: _closeInnerChannel, 143 // the sink closing will cause the stream to emit a done event which will
149 onError: _streamController.addError); 144 // trigger more cleanup.
145 controller.local.sink.close();
146 },
147 onDone: _closeInnerChannel,
tjblasi 2016/02/10 22:10:10 This looks like odd indentation to me, but if dart
nweiz 2016/02/10 22:26:13 I don't generally use dartfmt, but it does happen
148 onError: _mainController.local.sink.addError);
150 } 149 }
151 150
152 VirtualChannel virtualChannel([id]) { 151 VirtualChannel virtualChannel([id]) {
152 // If the inner channel has already closed, create new virtual channels in a
153 // closed state.
153 if (_inner == null) { 154 if (_inner == null) {
154 throw new StateError("The underlying channel is closed."); 155 if (id == null) {
156 id = _nextId + 1;
tjblasi 2016/02/10 22:10:10 This is well explained below & in the comments abo
nweiz 2016/02/10 22:26:13 The trouble is, most of the time we need to return
157 _nextId += 2;
158 }
159
160 return new VirtualChannel._(
161 this, id, new Stream.empty(), new NullStreamSink());
155 } 162 }
156 163
157 var inputId; 164 var inputId;
158 var outputId; 165 var outputId;
159 if (id != null) { 166 if (id != null) {
160 // Since the user is passing in an id, we're connected to a remote 167 // Since the user is passing in an id, we're connected to a remote
161 // VirtualChannel. This means messages they send over this channel will 168 // VirtualChannel. This means messages they send over this channel will
162 // have the original odd id, but our replies will have an even id. 169 // have the original odd id, but our replies will have an even id.
163 inputId = id; 170 inputId = id;
164 outputId = (id as int) + 1; 171 outputId = (id as int) + 1;
165 } else { 172 } else {
166 // Since we're generating an id, we originated this VirtualChannel. This 173 // Since we're generating an id, we originated this VirtualChannel. This
167 // means messages we send over this channel will have the original odd id, 174 // means messages we send over this channel will have the original odd id,
168 // but the remote channel's replies will have an even id. 175 // but the remote channel's replies will have an even id.
169 inputId = _nextId + 1; 176 inputId = _nextId + 1;
170 outputId = _nextId; 177 outputId = _nextId;
171 _nextId += 2; 178 _nextId += 2;
172 } 179 }
173 180
174 if (_streamControllers.containsKey(inputId)) { 181 if (_controllers.containsKey(inputId)) {
175 throw new ArgumentError("A virtual channel with id $id already exists."); 182 throw new ArgumentError("A virtual channel with id $id already exists.");
176 } 183 }
177 184
178 var streamController = new StreamController(sync: true); 185 var controller = new StreamChannelController(sync: true);
179 var sinkController = new StreamController(sync: true); 186 _controllers[inputId] = controller;
180 _streamControllers[inputId] = streamController; 187 controller.local.stream.listen(
181 _sinkControllers[inputId] = sinkController;
182 sinkController.stream.listen(
183 (message) => _inner.sink.add([outputId, message]), 188 (message) => _inner.sink.add([outputId, message]),
184 onDone: () => _closeChannel(inputId, outputId)); 189 onDone: () => _closeChannel(inputId, outputId));
185 190
186 return new VirtualChannel._( 191 return new VirtualChannel._(
187 this, outputId, streamController.stream, sinkController.sink); 192 this, outputId, controller.foreign.stream, controller.foreign.sink);
188 } 193 }
189 194
190 /// Closes the virtual channel for which incoming messages have [inputId] and 195 /// Closes the virtual channel for which incoming messages have [inputId] and
191 /// outgoing messages have [outputId]. 196 /// outgoing messages have [outputId].
192 void _closeChannel(int inputId, int outputId) { 197 void _closeChannel(int inputId, int outputId) {
193 _streamControllers.remove(inputId).close(); 198 var controller = _controllers.remove(inputId);
194 _sinkControllers.remove(inputId).close(); 199 controller.local.sink.close();
195 200
196 if (_inner == null) return; 201 if (_inner == null) return;
197 202
198 // A message without data indicates that the virtual channel has been 203 // A message without data indicates that the virtual channel has been
199 // closed. 204 // closed.
200 _inner.sink.add([outputId]); 205 _inner.sink.add([outputId]);
201 if (_streamControllers.isEmpty) _closeInnerChannel(); 206 if (_controllers.isEmpty) _closeInnerChannel();
202 } 207 }
203 208
204 /// Closes the underlying communication channel. 209 /// Closes the underlying communication channel.
205 void _closeInnerChannel() { 210 void _closeInnerChannel() {
206 _inner.sink.close(); 211 _inner.sink.close();
207 _innerStreamSubscription.cancel(); 212 _innerStreamSubscription.cancel();
208 _inner = null; 213 _inner = null;
209 for (var controller in _sinkControllers.values.toList()) { 214
210 controller.close(); 215 // Convert this to a list because the close is dispatched synchronously, and
216 // that could conceivably remove a controller from [_controllers].
217 for (var controller in _controllers.values.toList()) {
tjblasi 2016/02/10 22:10:10 nit: I think `new List.from(_controller.values)` w
nweiz 2016/02/10 22:26:12 Done, although I'm curious what about this is more
218 controller.local.sink.close();
211 } 219 }
220 _controllers.clear();
212 } 221 }
213 } 222 }
214 223
215 /// A virtual channel created by [MultiChannel]. 224 /// A virtual channel created by [MultiChannel].
216 /// 225 ///
217 /// This implements [MultiChannel] for convenience. 226 /// This implements [MultiChannel] for convenience.
218 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's 227 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's
219 /// [MultiChannel.virtualChannel]. 228 /// [MultiChannel.virtualChannel].
220 class VirtualChannel extends StreamChannelMixin implements MultiChannel { 229 class VirtualChannel extends StreamChannelMixin implements MultiChannel {
221 /// The [MultiChannel] that created this. 230 /// The [MultiChannel] that created this.
222 final MultiChannel _parent; 231 final MultiChannel _parent;
223 232
224 /// The identifier for this channel. 233 /// The identifier for this channel.
225 /// 234 ///
226 /// This can be sent across the [MultiChannel] to provide the remote endpoint 235 /// This can be sent across the [MultiChannel] to provide the remote endpoint
227 /// a means to connect to this channel. Nothing about this is guaranteed 236 /// a means to connect to this channel. Nothing about this is guaranteed
228 /// except that it will be JSON-serializable. 237 /// except that it will be JSON-serializable.
229 final id; 238 final id;
230 239
231 final Stream stream; 240 final Stream stream;
232 final StreamSink sink; 241 final StreamSink sink;
233 242
234 VirtualChannel._(this._parent, this.id, this.stream, this.sink); 243 VirtualChannel._(this._parent, this.id, this.stream, this.sink);
235 244
236 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); 245 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
237 } 246 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698