OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
| 7 import 'package:async/async.dart'; |
| 8 |
7 import '../stream_channel.dart'; | 9 import '../stream_channel.dart'; |
8 | 10 |
9 /// A class that multiplexes multiple virtual channels across a single | 11 /// A class that multiplexes multiple virtual channels across a single |
10 /// underlying transport layer. | 12 /// underlying transport layer. |
11 /// | 13 /// |
12 /// This should be connected to another [MultiChannel] on the other end of the | 14 /// This should be connected to another [MultiChannel] on the other end of the |
13 /// underlying channel. It starts with a single default virtual channel, | 15 /// underlying channel. It starts with a single default virtual channel, |
14 /// accessible via [stream] and [sink]. Additional virtual channels can be | 16 /// accessible via [stream] and [sink]. Additional virtual channels can be |
15 /// created with [virtualChannel]. | 17 /// created with [virtualChannel]. |
16 /// | 18 /// |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
78 /// without having to implement all the private members. | 80 /// without having to implement all the private members. |
79 class _MultiChannel extends StreamChannelMixin implements MultiChannel { | 81 class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
80 /// The inner channel over which all communication is conducted. | 82 /// The inner channel over which all communication is conducted. |
81 /// | 83 /// |
82 /// This will be `null` if the underlying communication channel is closed. | 84 /// This will be `null` if the underlying communication channel is closed. |
83 StreamChannel _inner; | 85 StreamChannel _inner; |
84 | 86 |
85 /// The subscription to [_inner.stream]. | 87 /// The subscription to [_inner.stream]. |
86 StreamSubscription _innerStreamSubscription; | 88 StreamSubscription _innerStreamSubscription; |
87 | 89 |
88 Stream get stream => _streamController.stream; | 90 Stream get stream => _mainController.foreign.stream; |
89 final _streamController = new StreamController(sync: true); | 91 StreamSink get sink => _mainController.foreign.sink; |
90 | 92 |
91 StreamSink get sink => _sinkController.sink; | 93 /// The controller for this channel. |
92 final _sinkController = new StreamController(sync: true); | 94 final _mainController = new StreamChannelController(sync: true); |
93 | 95 |
94 /// A map from virtual channel ids to [StreamController]s that should be used | 96 /// A map from virtual channel ids to [StreamChannelController]s that should |
95 /// to write messages received from those channels. | 97 /// be used to communicate over those channels. |
96 final _streamControllers = new Map<int, StreamController>(); | 98 final _controllers = <int, StreamChannelController>{}; |
97 | |
98 /// A map from virtual channel ids to [StreamControllers]s that are used | |
99 /// to receive messages to write to those channels. | |
100 /// | |
101 /// Note that this uses the same keys as [_streamControllers]. | |
102 final _sinkControllers = new Map<int, StreamController>(); | |
103 | 99 |
104 /// The next id to use for a local virtual channel. | 100 /// The next id to use for a local virtual channel. |
105 /// | 101 /// |
106 /// Ids are used to identify virtual channels. Each message is tagged with an | 102 /// Ids are used to identify virtual channels. Each message is tagged with an |
107 /// id; the receiving [MultiChannel] uses this id to look up which | 103 /// id; the receiving [MultiChannel] uses this id to look up which |
108 /// [VirtualChannel] the message should be dispatched to. | 104 /// [VirtualChannel] the message should be dispatched to. |
109 /// | 105 /// |
110 /// The id scheme for virtual channels is somewhat complicated. This is | 106 /// The id scheme for virtual channels is somewhat complicated. This is |
111 /// necessary to ensure that there are no conflicts even when both endpoints | 107 /// necessary to ensure that there are no conflicts even when both endpoints |
112 /// have virtual channels with the same id; since both endpoints can send and | 108 /// have virtual channels with the same id; since both endpoints can send and |
113 /// receive messages across each virtual channel, a naïve scheme would make it | 109 /// receive messages across each virtual channel, a naïve scheme would make it |
114 /// impossible to tell whether a message was from a channel that originated in | 110 /// impossible to tell whether a message was from a channel that originated in |
115 /// the remote endpoint or a reply on a channel that originated in the local | 111 /// the remote endpoint or a reply on a channel that originated in the local |
116 /// endpoint. | 112 /// endpoint. |
117 /// | 113 /// |
118 /// The trick is that each endpoint only uses odd ids for its own channels. | 114 /// The trick is that each endpoint only uses odd ids for its own channels. |
119 /// When sending a message over a channel that was created by the remote | 115 /// When sending a message over a channel that was created by the remote |
120 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] | 116 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] |
121 /// knows that if an incoming message has an odd id, it's using the local id | 117 /// knows that if an incoming message has an odd id, it's using the local id |
122 /// scheme, but if it has an even id, it's using the remote id scheme. | 118 /// scheme, but if it has an even id, it's using the remote id scheme. |
123 var _nextId = 1; | 119 var _nextId = 1; |
124 | 120 |
125 _MultiChannel(this._inner) { | 121 _MultiChannel(this._inner) { |
126 // The default connection is a special case which has id 0 on both ends. | 122 // The default connection is a special case which has id 0 on both ends. |
127 // This allows it to begin connected without having to send over an id. | 123 // This allows it to begin connected without having to send over an id. |
128 _streamControllers[0] = _streamController; | 124 _controllers[0] = _mainController; |
129 _sinkControllers[0] = _sinkController; | 125 _mainController.local.stream.listen( |
130 _sinkController.stream.listen( | |
131 (message) => _inner.sink.add([0, message]), | 126 (message) => _inner.sink.add([0, message]), |
132 onDone: () => _closeChannel(0, 0)); | 127 onDone: () => _closeChannel(0, 0)); |
133 | 128 |
134 _innerStreamSubscription = _inner.stream.listen((message) { | 129 _innerStreamSubscription = _inner.stream.listen((message) { |
135 var id = message[0]; | 130 var id = message[0]; |
136 var sink = _streamControllers[id]; | 131 var controller = _controllers[id]; |
137 | 132 |
138 // A sink might not exist if the channel was closed before an incoming | 133 // A controller might not exist if the channel was closed before an |
139 // message was processed. | 134 // incoming message was processed. |
140 if (sink == null) return; | 135 if (controller == null) return; |
141 if (message.length > 1) { | 136 if (message.length > 1) { |
142 sink.add(message[1]); | 137 controller.local.sink.add(message[1]); |
143 return; | 138 return; |
144 } | 139 } |
145 | 140 |
146 // A message without data indicates that the channel has been closed. | 141 // A message without data indicates that the channel has been closed. We |
147 _sinkControllers[id].close(); | 142 // can only close the sink here without doing any more cleanup, because |
148 }, onDone: _closeInnerChannel, | 143 // the sink closing will cause the stream to emit a done event which will |
149 onError: _streamController.addError); | 144 // trigger more cleanup. |
| 145 controller.local.sink.close(); |
| 146 }, |
| 147 onDone: _closeInnerChannel, |
| 148 onError: _mainController.local.sink.addError); |
150 } | 149 } |
151 | 150 |
152 VirtualChannel virtualChannel([id]) { | 151 VirtualChannel virtualChannel([id]) { |
153 if (_inner == null) { | |
154 throw new StateError("The underlying channel is closed."); | |
155 } | |
156 | |
157 var inputId; | 152 var inputId; |
158 var outputId; | 153 var outputId; |
159 if (id != null) { | 154 if (id != null) { |
160 // Since the user is passing in an id, we're connected to a remote | 155 // Since the user is passing in an id, we're connected to a remote |
161 // VirtualChannel. This means messages they send over this channel will | 156 // VirtualChannel. This means messages they send over this channel will |
162 // have the original odd id, but our replies will have an even id. | 157 // have the original odd id, but our replies will have an even id. |
163 inputId = id; | 158 inputId = id; |
164 outputId = (id as int) + 1; | 159 outputId = (id as int) + 1; |
165 } else { | 160 } else { |
166 // Since we're generating an id, we originated this VirtualChannel. This | 161 // Since we're generating an id, we originated this VirtualChannel. This |
167 // means messages we send over this channel will have the original odd id, | 162 // means messages we send over this channel will have the original odd id, |
168 // but the remote channel's replies will have an even id. | 163 // but the remote channel's replies will have an even id. |
169 inputId = _nextId + 1; | 164 inputId = _nextId + 1; |
170 outputId = _nextId; | 165 outputId = _nextId; |
171 _nextId += 2; | 166 _nextId += 2; |
172 } | 167 } |
173 | 168 |
174 if (_streamControllers.containsKey(inputId)) { | 169 // If the inner channel has already closed, create new virtual channels in a |
| 170 // closed state. |
| 171 if (_inner == null) { |
| 172 return new VirtualChannel._( |
| 173 this, inputId, new Stream.empty(), new NullStreamSink()); |
| 174 } |
| 175 |
| 176 if (_controllers.containsKey(inputId)) { |
175 throw new ArgumentError("A virtual channel with id $id already exists."); | 177 throw new ArgumentError("A virtual channel with id $id already exists."); |
176 } | 178 } |
177 | 179 |
178 var streamController = new StreamController(sync: true); | 180 var controller = new StreamChannelController(sync: true); |
179 var sinkController = new StreamController(sync: true); | 181 _controllers[inputId] = controller; |
180 _streamControllers[inputId] = streamController; | 182 controller.local.stream.listen( |
181 _sinkControllers[inputId] = sinkController; | |
182 sinkController.stream.listen( | |
183 (message) => _inner.sink.add([outputId, message]), | 183 (message) => _inner.sink.add([outputId, message]), |
184 onDone: () => _closeChannel(inputId, outputId)); | 184 onDone: () => _closeChannel(inputId, outputId)); |
185 | 185 |
186 return new VirtualChannel._( | 186 return new VirtualChannel._( |
187 this, outputId, streamController.stream, sinkController.sink); | 187 this, outputId, controller.foreign.stream, controller.foreign.sink); |
188 } | 188 } |
189 | 189 |
190 /// Closes the virtual channel for which incoming messages have [inputId] and | 190 /// Closes the virtual channel for which incoming messages have [inputId] and |
191 /// outgoing messages have [outputId]. | 191 /// outgoing messages have [outputId]. |
192 void _closeChannel(int inputId, int outputId) { | 192 void _closeChannel(int inputId, int outputId) { |
193 _streamControllers.remove(inputId).close(); | 193 var controller = _controllers.remove(inputId); |
194 _sinkControllers.remove(inputId).close(); | 194 controller.local.sink.close(); |
195 | 195 |
196 if (_inner == null) return; | 196 if (_inner == null) return; |
197 | 197 |
198 // A message without data indicates that the virtual channel has been | 198 // A message without data indicates that the virtual channel has been |
199 // closed. | 199 // closed. |
200 _inner.sink.add([outputId]); | 200 _inner.sink.add([outputId]); |
201 if (_streamControllers.isEmpty) _closeInnerChannel(); | 201 if (_controllers.isEmpty) _closeInnerChannel(); |
202 } | 202 } |
203 | 203 |
204 /// Closes the underlying communication channel. | 204 /// Closes the underlying communication channel. |
205 void _closeInnerChannel() { | 205 void _closeInnerChannel() { |
206 _inner.sink.close(); | 206 _inner.sink.close(); |
207 _innerStreamSubscription.cancel(); | 207 _innerStreamSubscription.cancel(); |
208 _inner = null; | 208 _inner = null; |
209 for (var controller in _sinkControllers.values.toList()) { | 209 |
210 controller.close(); | 210 // Convert this to a list because the close is dispatched synchronously, and |
| 211 // that could conceivably remove a controller from [_controllers]. |
| 212 for (var controller in new List.from(_controllers.values)) { |
| 213 controller.local.sink.close(); |
211 } | 214 } |
| 215 _controllers.clear(); |
212 } | 216 } |
213 } | 217 } |
214 | 218 |
215 /// A virtual channel created by [MultiChannel]. | 219 /// A virtual channel created by [MultiChannel]. |
216 /// | 220 /// |
217 /// This implements [MultiChannel] for convenience. | 221 /// This implements [MultiChannel] for convenience. |
218 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's | 222 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's |
219 /// [MultiChannel.virtualChannel]. | 223 /// [MultiChannel.virtualChannel]. |
220 class VirtualChannel extends StreamChannelMixin implements MultiChannel { | 224 class VirtualChannel extends StreamChannelMixin implements MultiChannel { |
221 /// The [MultiChannel] that created this. | 225 /// The [MultiChannel] that created this. |
222 final MultiChannel _parent; | 226 final MultiChannel _parent; |
223 | 227 |
224 /// The identifier for this channel. | 228 /// The identifier for this channel. |
225 /// | 229 /// |
226 /// This can be sent across the [MultiChannel] to provide the remote endpoint | 230 /// This can be sent across the [MultiChannel] to provide the remote endpoint |
227 /// a means to connect to this channel. Nothing about this is guaranteed | 231 /// a means to connect to this channel. Nothing about this is guaranteed |
228 /// except that it will be JSON-serializable. | 232 /// except that it will be JSON-serializable. |
229 final id; | 233 final id; |
230 | 234 |
231 final Stream stream; | 235 final Stream stream; |
232 final StreamSink sink; | 236 final StreamSink sink; |
233 | 237 |
234 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 238 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
235 | 239 |
236 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 240 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
237 } | 241 } |
OLD | NEW |