Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(422)

Side by Side Diff: lib/src/multi_channel.dart

Issue 1644943004: Fix the MultiChannel constructor. (Closed) Base URL: git@github.com:dart-lang/stream_channel.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import '../stream_channel.dart'; 7 import '../stream_channel.dart';
8 8
9 /// A class that multiplexes multiple virtual channels across a single 9 /// A class that multiplexes multiple virtual channels across a single
10 /// underlying transport layer. 10 /// underlying transport layer.
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
45 /// This connects to the remote [sink]. 45 /// This connects to the remote [sink].
46 Stream get stream; 46 Stream get stream;
47 47
48 /// The default output stream. 48 /// The default output stream.
49 /// 49 ///
50 /// This connects to the remote [stream]. If this is closed, the remote 50 /// This connects to the remote [stream]. If this is closed, the remote
51 /// [stream] will close, but other virtual channels will remain open and new 51 /// [stream] will close, but other virtual channels will remain open and new
52 /// virtual channels may be opened. 52 /// virtual channels may be opened.
53 StreamSink get sink; 53 StreamSink get sink;
54 54
55 /// Creates a new [MultiChannel] that sends messages over [innerStream] and 55 /// Creates a new [MultiChannel] that sends and receives messages over
56 /// [innerSink]. 56 /// [inner].
57 /// 57 ///
58 /// The inner streams must take JSON-like objects. 58 /// The inner channel must take JSON-like objects.
59 factory MultiChannel(Stream innerStream, StreamSink innerSink) => 59 factory MultiChannel(StreamChannel inner) => new _MultiChannel(inner);
60 new _MultiChannel(innerStream, innerSink);
61 60
62 /// Creates a new virtual channel. 61 /// Creates a new virtual channel.
63 /// 62 ///
64 /// If [id] is not passed, this creates a virtual channel from scratch. Before 63 /// If [id] is not passed, this creates a virtual channel from scratch. Before
65 /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint 64 /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint
66 /// where [virtualChannel] should be called with that id. 65 /// where [virtualChannel] should be called with that id.
67 /// 66 ///
68 /// If [id] is passed, this creates a virtual channel corresponding to the 67 /// If [id] is passed, this creates a virtual channel corresponding to the
69 /// channel with that id on the remote channel. 68 /// channel with that id on the remote channel.
70 /// 69 ///
71 /// Throws an [ArgumentError] if a virtual channel already exists for [id]. 70 /// Throws an [ArgumentError] if a virtual channel already exists for [id].
72 /// Throws a [StateError] if the underlying channel is closed. 71 /// Throws a [StateError] if the underlying channel is closed.
73 VirtualChannel virtualChannel([id]); 72 VirtualChannel virtualChannel([id]);
74 } 73 }
75 74
76 /// The implementation of [MultiChannel]. 75 /// The implementation of [MultiChannel].
77 /// 76 ///
78 /// This is private so that [VirtualChannel] can inherit from [MultiChannel] 77 /// This is private so that [VirtualChannel] can inherit from [MultiChannel]
79 /// without having to implement all the private members. 78 /// without having to implement all the private members.
80 class _MultiChannel extends StreamChannelMixin implements MultiChannel { 79 class _MultiChannel extends StreamChannelMixin implements MultiChannel {
81 /// The inner stream over which all communication is received. 80 /// The inner channel over which all communication is conducted.
82 /// 81 ///
83 /// This will be `null` if the underlying communication channel is closed. 82 /// This will be `null` if the underlying communication channel is closed.
84 Stream _innerStream; 83 StreamChannel _inner;
85 84
86 /// The inner sink over which all communication is sent. 85 /// The subscription to [_inner.stream].
87 ///
88 /// This will be `null` if the underlying communication channel is closed.
89 StreamSink _innerSink;
90
91 /// The subscription to [_innerStream].
92 StreamSubscription _innerStreamSubscription; 86 StreamSubscription _innerStreamSubscription;
93 87
94 Stream get stream => _streamController.stream; 88 Stream get stream => _streamController.stream;
95 final _streamController = new StreamController(sync: true); 89 final _streamController = new StreamController(sync: true);
96 90
97 StreamSink get sink => _sinkController.sink; 91 StreamSink get sink => _sinkController.sink;
98 final _sinkController = new StreamController(sync: true); 92 final _sinkController = new StreamController(sync: true);
99 93
100 /// A map from virtual channel ids to [StreamController]s that should be used 94 /// A map from virtual channel ids to [StreamController]s that should be used
101 /// to write messages received from those channels. 95 /// to write messages received from those channels.
(...skipping 19 matching lines...) Expand all
121 /// the remote endpoint or a reply on a channel that originated in the local 115 /// the remote endpoint or a reply on a channel that originated in the local
122 /// endpoint. 116 /// endpoint.
123 /// 117 ///
124 /// The trick is that each endpoint only uses odd ids for its own channels. 118 /// The trick is that each endpoint only uses odd ids for its own channels.
125 /// When sending a message over a channel that was created by the remote 119 /// When sending a message over a channel that was created by the remote
126 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] 120 /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
127 /// knows that if an incoming message has an odd id, it's using the local id 121 /// knows that if an incoming message has an odd id, it's using the local id
128 /// scheme, but if it has an even id, it's using the remote id scheme. 122 /// scheme, but if it has an even id, it's using the remote id scheme.
129 var _nextId = 1; 123 var _nextId = 1;
130 124
131 _MultiChannel(this._innerStream, this._innerSink) { 125 _MultiChannel(this._inner) {
132 // The default connection is a special case which has id 0 on both ends. 126 // The default connection is a special case which has id 0 on both ends.
133 // This allows it to begin connected without having to send over an id. 127 // This allows it to begin connected without having to send over an id.
134 _streamControllers[0] = _streamController; 128 _streamControllers[0] = _streamController;
135 _sinkControllers[0] = _sinkController; 129 _sinkControllers[0] = _sinkController;
136 _sinkController.stream.listen( 130 _sinkController.stream.listen(
137 (message) => _innerSink.add([0, message]), 131 (message) => _inner.sink.add([0, message]),
138 onDone: () => _closeChannel(0, 0)); 132 onDone: () => _closeChannel(0, 0));
139 133
140 _innerStreamSubscription = _innerStream.listen((message) { 134 _innerStreamSubscription = _inner.stream.listen((message) {
141 var id = message[0]; 135 var id = message[0];
142 var sink = _streamControllers[id]; 136 var sink = _streamControllers[id];
143 137
144 // A sink might not exist if the channel was closed before an incoming 138 // A sink might not exist if the channel was closed before an incoming
145 // message was processed. 139 // message was processed.
146 if (sink == null) return; 140 if (sink == null) return;
147 if (message.length > 1) { 141 if (message.length > 1) {
148 sink.add(message[1]); 142 sink.add(message[1]);
149 return; 143 return;
150 } 144 }
151 145
152 // A message without data indicates that the channel has been closed. 146 // A message without data indicates that the channel has been closed.
153 _sinkControllers[id].close(); 147 _sinkControllers[id].close();
154 }, onDone: _closeInnerChannel, 148 }, onDone: _closeInnerChannel,
155 onError: _streamController.addError); 149 onError: _streamController.addError);
156 } 150 }
157 151
158 VirtualChannel virtualChannel([id]) { 152 VirtualChannel virtualChannel([id]) {
159 if (_innerStream == null) { 153 if (_inner == null) {
160 throw new StateError("The underlying channel is closed."); 154 throw new StateError("The underlying channel is closed.");
161 } 155 }
162 156
163 var inputId; 157 var inputId;
164 var outputId; 158 var outputId;
165 if (id != null) { 159 if (id != null) {
166 // Since the user is passing in an id, we're connected to a remote 160 // Since the user is passing in an id, we're connected to a remote
167 // VirtualChannel. This means messages they send over this channel will 161 // VirtualChannel. This means messages they send over this channel will
168 // have the original odd id, but our replies will have an even id. 162 // have the original odd id, but our replies will have an even id.
169 inputId = id; 163 inputId = id;
170 outputId = (id as int) + 1; 164 outputId = (id as int) + 1;
171 } else { 165 } else {
172 // Since we're generating an id, we originated this VirtualChannel. This 166 // Since we're generating an id, we originated this VirtualChannel. This
173 // means messages we send over this channel will have the original odd id, 167 // means messages we send over this channel will have the original odd id,
174 // but the remote channel's replies will have an even id. 168 // but the remote channel's replies will have an even id.
175 inputId = _nextId + 1; 169 inputId = _nextId + 1;
176 outputId = _nextId; 170 outputId = _nextId;
177 _nextId += 2; 171 _nextId += 2;
178 } 172 }
179 173
180 if (_streamControllers.containsKey(inputId)) { 174 if (_streamControllers.containsKey(inputId)) {
181 throw new ArgumentError("A virtual channel with id $id already exists."); 175 throw new ArgumentError("A virtual channel with id $id already exists.");
182 } 176 }
183 177
184 var streamController = new StreamController(sync: true); 178 var streamController = new StreamController(sync: true);
185 var sinkController = new StreamController(sync: true); 179 var sinkController = new StreamController(sync: true);
186 _streamControllers[inputId] = streamController; 180 _streamControllers[inputId] = streamController;
187 _sinkControllers[inputId] = sinkController; 181 _sinkControllers[inputId] = sinkController;
188 sinkController.stream.listen( 182 sinkController.stream.listen(
189 (message) => _innerSink.add([outputId, message]), 183 (message) => _inner.sink.add([outputId, message]),
190 onDone: () => _closeChannel(inputId, outputId)); 184 onDone: () => _closeChannel(inputId, outputId));
191 185
192 return new VirtualChannel._( 186 return new VirtualChannel._(
193 this, outputId, streamController.stream, sinkController.sink); 187 this, outputId, streamController.stream, sinkController.sink);
194 } 188 }
195 189
196 /// Closes the virtual channel for which incoming messages have [inputId] and 190 /// Closes the virtual channel for which incoming messages have [inputId] and
197 /// outgoing messages have [outputId]. 191 /// outgoing messages have [outputId].
198 void _closeChannel(int inputId, int outputId) { 192 void _closeChannel(int inputId, int outputId) {
199 _streamControllers.remove(inputId).close(); 193 _streamControllers.remove(inputId).close();
200 _sinkControllers.remove(inputId).close(); 194 _sinkControllers.remove(inputId).close();
201 195
202 if (_innerSink == null) return; 196 if (_inner == null) return;
203 197
204 // A message without data indicates that the virtual channel has been 198 // A message without data indicates that the virtual channel has been
205 // closed. 199 // closed.
206 _innerSink.add([outputId]); 200 _inner.sink.add([outputId]);
207 if (_streamControllers.isEmpty) _closeInnerChannel(); 201 if (_streamControllers.isEmpty) _closeInnerChannel();
208 } 202 }
209 203
210 /// Closes the underlying communication channel. 204 /// Closes the underlying communication channel.
211 void _closeInnerChannel() { 205 void _closeInnerChannel() {
212 _innerSink.close(); 206 _inner.sink.close();
213 _innerStreamSubscription.cancel(); 207 _innerStreamSubscription.cancel();
214 _innerStream = null; 208 _inner = null;
215 _innerSink = null;
216 for (var controller in _sinkControllers.values.toList()) { 209 for (var controller in _sinkControllers.values.toList()) {
217 controller.close(); 210 controller.close();
218 } 211 }
219 } 212 }
220 } 213 }
221 214
222 /// A virtual channel created by [MultiChannel]. 215 /// A virtual channel created by [MultiChannel].
223 /// 216 ///
224 /// This implements [MultiChannel] for convenience. 217 /// This implements [MultiChannel] for convenience.
225 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's 218 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's
226 /// [MultiChannel.virtualChannel]. 219 /// [MultiChannel.virtualChannel].
227 class VirtualChannel extends StreamChannelMixin implements MultiChannel { 220 class VirtualChannel extends StreamChannelMixin implements MultiChannel {
228 /// The [MultiChannel] that created this. 221 /// The [MultiChannel] that created this.
229 final MultiChannel _parent; 222 final MultiChannel _parent;
230 223
231 /// The identifier for this channel. 224 /// The identifier for this channel.
232 /// 225 ///
233 /// This can be sent across the [MultiChannel] to provide the remote endpoint 226 /// This can be sent across the [MultiChannel] to provide the remote endpoint
234 /// a means to connect to this channel. Nothing about this is guaranteed 227 /// a means to connect to this channel. Nothing about this is guaranteed
235 /// except that it will be JSON-serializable. 228 /// except that it will be JSON-serializable.
236 final id; 229 final id;
237 230
238 final Stream stream; 231 final Stream stream;
239 final StreamSink sink; 232 final StreamSink sink;
240 233
241 VirtualChannel._(this._parent, this.id, this.stream, this.sink); 234 VirtualChannel._(this._parent, this.id, this.stream, this.sink);
242 235
243 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); 236 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
244 } 237 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698