OLD | NEW |
---|---|
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import 'package:async/async.dart'; | |
8 | |
7 import '../stream_channel.dart'; | 9 import '../stream_channel.dart'; |
8 | 10 |
9 /// A class that multiplexes multiple virtual channels across a single | 11 /// A class that multiplexes multiple virtual channels across a single |
10 /// underlying transport layer. | 12 /// underlying transport layer. |
11 /// | 13 /// |
12 /// This should be connected to another [MultiChannel] on the other end of the | 14 /// This should be connected to another [MultiChannel] on the other end of the |
13 /// underlying channel. It starts with a single default virtual channel, | 15 /// underlying channel. It starts with a single default virtual channel, |
14 /// accessible via [stream] and [sink]. Additional virtual channels can be | 16 /// accessible via [stream] and [sink]. Additional virtual channels can be |
15 /// created with [virtualChannel]. | 17 /// created with [virtualChannel]. |
16 /// | 18 /// |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
78 /// without having to implement all the private members. | 80 /// without having to implement all the private members. |
79 class _MultiChannel extends StreamChannelMixin implements MultiChannel { | 81 class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
80 /// The inner channel over which all communication is conducted. | 82 /// The inner channel over which all communication is conducted. |
81 /// | 83 /// |
82 /// This will be `null` if the underlying communication channel is closed. | 84 /// This will be `null` if the underlying communication channel is closed. |
83 StreamChannel _inner; | 85 StreamChannel _inner; |
84 | 86 |
85 /// The subscription to [_inner.stream]. | 87 /// The subscription to [_inner.stream]. |
86 StreamSubscription _innerStreamSubscription; | 88 StreamSubscription _innerStreamSubscription; |
87 | 89 |
88 Stream get stream => _streamController.stream; | 90 Stream get stream => _mainController.foreign.stream; |
89 final _streamController = new StreamController(sync: true); | 91 StreamSink get sink => _mainController.foreign.sink; |
90 | 92 |
91 StreamSink get sink => _sinkController.sink; | 93 /// The controller for this channel. |
92 final _sinkController = new StreamController(sync: true); | 94 final _mainController = new StreamChannelController(sync: true); |
93 | 95 |
94 /// A map from virtual channel ids to [StreamController]s that should be used | 96 /// A map from virtual channel ids to [StreamChannelController]s that should |
95 /// to write messages received from those channels. | 97 /// be used to communicate over those channels. |
96 final _streamControllers = new Map<int, StreamController>(); | 98 final _controllers = <int, StreamChannelController>{}; |
97 | |
98 /// A map from virtual channel ids to [StreamControllers]s that are used | |
99 /// to receive messages to write to those channels. | |
100 /// | |
101 /// Note that this uses the same keys as [_streamControllers]. | |
102 final _sinkControllers = new Map<int, StreamController>(); | |
103 | 99 |
104 /// The next id to use for a local virtual channel. | 100 /// The next id to use for a local virtual channel. |
105 /// | 101 /// |
106 /// Ids are used to identify virtual channels. Each message is tagged with an | 102 /// Ids are used to identify virtual channels. Each message is tagged with an |
107 /// id; the receiving [MultiChannel] uses this id to look up which | 103 /// id; the receiving [MultiChannel] uses this id to look up which |
108 /// [VirtualChannel] the message should be dispatched to. | 104 /// [VirtualChannel] the message should be dispatched to. |
109 /// | 105 /// |
110 /// The id scheme for virtual channels is somewhat complicated. This is | 106 /// The id scheme for virtual channels is somewhat complicated. This is |
111 /// necessary to ensure that there are no conflicts even when both endpoints | 107 /// necessary to ensure that there are no conflicts even when both endpoints |
112 /// have virtual channels with the same id; since both endpoints can send and | 108 /// have virtual channels with the same id; since both endpoints can send and |
113 /// receive messages across each virtual channel, a naïve scheme would make it | 109 /// receive messages across each virtual channel, a naïve scheme would make it |
114 /// impossible to tell whether a message was from a channel that originated in | 110 /// impossible to tell whether a message was from a channel that originated in |
115 /// the remote endpoint or a reply on a channel that originated in the local | 111 /// the remote endpoint or a reply on a channel that originated in the local |
116 /// endpoint. | 112 /// endpoint. |
117 /// | 113 /// |
118 /// The trick is that each endpoint only uses odd ids for its own channels. | 114 /// The trick is that each endpoint only uses odd ids for its own channels. |
119 /// When sending a message over a channel that was created by the remote | 115 /// When sending a message over a channel that was created by the remote |
120 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] | 116 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] |
121 /// knows that if an incoming message has an odd id, it's using the local id | 117 /// knows that if an incoming message has an odd id, it's using the local id |
122 /// scheme, but if it has an even id, it's using the remote id scheme. | 118 /// scheme, but if it has an even id, it's using the remote id scheme. |
123 var _nextId = 1; | 119 var _nextId = 1; |
124 | 120 |
125 _MultiChannel(this._inner) { | 121 _MultiChannel(this._inner) { |
126 // The default connection is a special case which has id 0 on both ends. | 122 // The default connection is a special case which has id 0 on both ends. |
127 // This allows it to begin connected without having to send over an id. | 123 // This allows it to begin connected without having to send over an id. |
128 _streamControllers[0] = _streamController; | 124 _controllers[0] = _mainController; |
129 _sinkControllers[0] = _sinkController; | 125 _mainController.local.stream.listen( |
130 _sinkController.stream.listen( | |
131 (message) => _inner.sink.add([0, message]), | 126 (message) => _inner.sink.add([0, message]), |
132 onDone: () => _closeChannel(0, 0)); | 127 onDone: () => _closeChannel(0, 0)); |
133 | 128 |
134 _innerStreamSubscription = _inner.stream.listen((message) { | 129 _innerStreamSubscription = _inner.stream.listen((message) { |
135 var id = message[0]; | 130 var id = message[0]; |
136 var sink = _streamControllers[id]; | 131 var controller = _controllers[id]; |
137 | 132 |
138 // A sink might not exist if the channel was closed before an incoming | 133 // A controller might not exist if the channel was closed before an |
139 // message was processed. | 134 // incoming message was processed. |
140 if (sink == null) return; | 135 if (controller == null) return; |
141 if (message.length > 1) { | 136 if (message.length > 1) { |
142 sink.add(message[1]); | 137 controller.local.sink.add(message[1]); |
143 return; | 138 return; |
144 } | 139 } |
145 | 140 |
146 // A message without data indicates that the channel has been closed. | 141 // A message without data indicates that the channel has been closed. We |
147 _sinkControllers[id].close(); | 142 // can only close the sink here without doing any more cleanup, because |
148 }, onDone: _closeInnerChannel, | 143 // the sink closing will cause the stream to emit a done event which will |
149 onError: _streamController.addError); | 144 // trigger more cleanup. |
145 controller.local.sink.close(); | |
146 }, | |
147 onDone: _closeInnerChannel, | |
tjblasi
2016/02/10 22:10:10
This looks like odd indentation to me, but if dart
nweiz
2016/02/10 22:26:13
I don't generally use dartfmt, but it does happen
| |
148 onError: _mainController.local.sink.addError); | |
150 } | 149 } |
151 | 150 |
152 VirtualChannel virtualChannel([id]) { | 151 VirtualChannel virtualChannel([id]) { |
152 // If the inner channel has already closed, create new virtual channels in a | |
153 // closed state. | |
153 if (_inner == null) { | 154 if (_inner == null) { |
154 throw new StateError("The underlying channel is closed."); | 155 if (id == null) { |
156 id = _nextId + 1; | |
tjblasi
2016/02/10 22:10:10
This is well explained below & in the comments abo
nweiz
2016/02/10 22:26:13
The trouble is, most of the time we need to return
| |
157 _nextId += 2; | |
158 } | |
159 | |
160 return new VirtualChannel._( | |
161 this, id, new Stream.empty(), new NullStreamSink()); | |
155 } | 162 } |
156 | 163 |
157 var inputId; | 164 var inputId; |
158 var outputId; | 165 var outputId; |
159 if (id != null) { | 166 if (id != null) { |
160 // Since the user is passing in an id, we're connected to a remote | 167 // Since the user is passing in an id, we're connected to a remote |
161 // VirtualChannel. This means messages they send over this channel will | 168 // VirtualChannel. This means messages they send over this channel will |
162 // have the original odd id, but our replies will have an even id. | 169 // have the original odd id, but our replies will have an even id. |
163 inputId = id; | 170 inputId = id; |
164 outputId = (id as int) + 1; | 171 outputId = (id as int) + 1; |
165 } else { | 172 } else { |
166 // Since we're generating an id, we originated this VirtualChannel. This | 173 // Since we're generating an id, we originated this VirtualChannel. This |
167 // means messages we send over this channel will have the original odd id, | 174 // means messages we send over this channel will have the original odd id, |
168 // but the remote channel's replies will have an even id. | 175 // but the remote channel's replies will have an even id. |
169 inputId = _nextId + 1; | 176 inputId = _nextId + 1; |
170 outputId = _nextId; | 177 outputId = _nextId; |
171 _nextId += 2; | 178 _nextId += 2; |
172 } | 179 } |
173 | 180 |
174 if (_streamControllers.containsKey(inputId)) { | 181 if (_controllers.containsKey(inputId)) { |
175 throw new ArgumentError("A virtual channel with id $id already exists."); | 182 throw new ArgumentError("A virtual channel with id $id already exists."); |
176 } | 183 } |
177 | 184 |
178 var streamController = new StreamController(sync: true); | 185 var controller = new StreamChannelController(sync: true); |
179 var sinkController = new StreamController(sync: true); | 186 _controllers[inputId] = controller; |
180 _streamControllers[inputId] = streamController; | 187 controller.local.stream.listen( |
181 _sinkControllers[inputId] = sinkController; | |
182 sinkController.stream.listen( | |
183 (message) => _inner.sink.add([outputId, message]), | 188 (message) => _inner.sink.add([outputId, message]), |
184 onDone: () => _closeChannel(inputId, outputId)); | 189 onDone: () => _closeChannel(inputId, outputId)); |
185 | 190 |
186 return new VirtualChannel._( | 191 return new VirtualChannel._( |
187 this, outputId, streamController.stream, sinkController.sink); | 192 this, outputId, controller.foreign.stream, controller.foreign.sink); |
188 } | 193 } |
189 | 194 |
190 /// Closes the virtual channel for which incoming messages have [inputId] and | 195 /// Closes the virtual channel for which incoming messages have [inputId] and |
191 /// outgoing messages have [outputId]. | 196 /// outgoing messages have [outputId]. |
192 void _closeChannel(int inputId, int outputId) { | 197 void _closeChannel(int inputId, int outputId) { |
193 _streamControllers.remove(inputId).close(); | 198 var controller = _controllers.remove(inputId); |
194 _sinkControllers.remove(inputId).close(); | 199 controller.local.sink.close(); |
195 | 200 |
196 if (_inner == null) return; | 201 if (_inner == null) return; |
197 | 202 |
198 // A message without data indicates that the virtual channel has been | 203 // A message without data indicates that the virtual channel has been |
199 // closed. | 204 // closed. |
200 _inner.sink.add([outputId]); | 205 _inner.sink.add([outputId]); |
201 if (_streamControllers.isEmpty) _closeInnerChannel(); | 206 if (_controllers.isEmpty) _closeInnerChannel(); |
202 } | 207 } |
203 | 208 |
204 /// Closes the underlying communication channel. | 209 /// Closes the underlying communication channel. |
205 void _closeInnerChannel() { | 210 void _closeInnerChannel() { |
206 _inner.sink.close(); | 211 _inner.sink.close(); |
207 _innerStreamSubscription.cancel(); | 212 _innerStreamSubscription.cancel(); |
208 _inner = null; | 213 _inner = null; |
209 for (var controller in _sinkControllers.values.toList()) { | 214 |
210 controller.close(); | 215 // Convert this to a list because the close is dispatched synchronously, and |
216 // that could conceivably remove a controller from [_controllers]. | |
217 for (var controller in _controllers.values.toList()) { | |
tjblasi
2016/02/10 22:10:10
nit: I think `new List.from(_controller.values)` w
nweiz
2016/02/10 22:26:12
Done, although I'm curious what about this is more
| |
218 controller.local.sink.close(); | |
211 } | 219 } |
220 _controllers.clear(); | |
212 } | 221 } |
213 } | 222 } |
214 | 223 |
215 /// A virtual channel created by [MultiChannel]. | 224 /// A virtual channel created by [MultiChannel]. |
216 /// | 225 /// |
217 /// This implements [MultiChannel] for convenience. | 226 /// This implements [MultiChannel] for convenience. |
218 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's | 227 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's |
219 /// [MultiChannel.virtualChannel]. | 228 /// [MultiChannel.virtualChannel]. |
220 class VirtualChannel extends StreamChannelMixin implements MultiChannel { | 229 class VirtualChannel extends StreamChannelMixin implements MultiChannel { |
221 /// The [MultiChannel] that created this. | 230 /// The [MultiChannel] that created this. |
222 final MultiChannel _parent; | 231 final MultiChannel _parent; |
223 | 232 |
224 /// The identifier for this channel. | 233 /// The identifier for this channel. |
225 /// | 234 /// |
226 /// This can be sent across the [MultiChannel] to provide the remote endpoint | 235 /// This can be sent across the [MultiChannel] to provide the remote endpoint |
227 /// a means to connect to this channel. Nothing about this is guaranteed | 236 /// a means to connect to this channel. Nothing about this is guaranteed |
228 /// except that it will be JSON-serializable. | 237 /// except that it will be JSON-serializable. |
229 final id; | 238 final id; |
230 | 239 |
231 final Stream stream; | 240 final Stream stream; |
232 final StreamSink sink; | 241 final StreamSink sink; |
233 | 242 |
234 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 243 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
235 | 244 |
236 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 245 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
237 } | 246 } |
OLD | NEW |