| OLD | NEW |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 import 'package:async/async.dart'; |
| 8 |
| 7 import '../stream_channel.dart'; | 9 import '../stream_channel.dart'; |
| 8 | 10 |
| 9 /// A class that multiplexes multiple virtual channels across a single | 11 /// A class that multiplexes multiple virtual channels across a single |
| 10 /// underlying transport layer. | 12 /// underlying transport layer. |
| 11 /// | 13 /// |
| 12 /// This should be connected to another [MultiChannel] on the other end of the | 14 /// This should be connected to another [MultiChannel] on the other end of the |
| 13 /// underlying channel. It starts with a single default virtual channel, | 15 /// underlying channel. It starts with a single default virtual channel, |
| 14 /// accessible via [stream] and [sink]. Additional virtual channels can be | 16 /// accessible via [stream] and [sink]. Additional virtual channels can be |
| 15 /// created with [virtualChannel]. | 17 /// created with [virtualChannel]. |
| 16 /// | 18 /// |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 78 /// without having to implement all the private members. | 80 /// without having to implement all the private members. |
| 79 class _MultiChannel extends StreamChannelMixin implements MultiChannel { | 81 class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
| 80 /// The inner channel over which all communication is conducted. | 82 /// The inner channel over which all communication is conducted. |
| 81 /// | 83 /// |
| 82 /// This will be `null` if the underlying communication channel is closed. | 84 /// This will be `null` if the underlying communication channel is closed. |
| 83 StreamChannel _inner; | 85 StreamChannel _inner; |
| 84 | 86 |
| 85 /// The subscription to [_inner.stream]. | 87 /// The subscription to [_inner.stream]. |
| 86 StreamSubscription _innerStreamSubscription; | 88 StreamSubscription _innerStreamSubscription; |
| 87 | 89 |
| 88 Stream get stream => _streamController.stream; | 90 Stream get stream => _mainController.foreign.stream; |
| 89 final _streamController = new StreamController(sync: true); | 91 StreamSink get sink => _mainController.foreign.sink; |
| 90 | 92 |
| 91 StreamSink get sink => _sinkController.sink; | 93 /// The controller for this channel. |
| 92 final _sinkController = new StreamController(sync: true); | 94 final _mainController = new StreamChannelController(sync: true); |
| 93 | 95 |
| 94 /// A map from virtual channel ids to [StreamController]s that should be used | 96 /// A map from virtual channel ids to [StreamChannelController]s that should |
| 95 /// to write messages received from those channels. | 97 /// be used to communicate over those channels. |
| 96 final _streamControllers = new Map<int, StreamController>(); | 98 final _controllers = <int, StreamChannelController>{}; |
| 97 | |
| 98 /// A map from virtual channel ids to [StreamControllers]s that are used | |
| 99 /// to receive messages to write to those channels. | |
| 100 /// | |
| 101 /// Note that this uses the same keys as [_streamControllers]. | |
| 102 final _sinkControllers = new Map<int, StreamController>(); | |
| 103 | 99 |
| 104 /// The next id to use for a local virtual channel. | 100 /// The next id to use for a local virtual channel. |
| 105 /// | 101 /// |
| 106 /// Ids are used to identify virtual channels. Each message is tagged with an | 102 /// Ids are used to identify virtual channels. Each message is tagged with an |
| 107 /// id; the receiving [MultiChannel] uses this id to look up which | 103 /// id; the receiving [MultiChannel] uses this id to look up which |
| 108 /// [VirtualChannel] the message should be dispatched to. | 104 /// [VirtualChannel] the message should be dispatched to. |
| 109 /// | 105 /// |
| 110 /// The id scheme for virtual channels is somewhat complicated. This is | 106 /// The id scheme for virtual channels is somewhat complicated. This is |
| 111 /// necessary to ensure that there are no conflicts even when both endpoints | 107 /// necessary to ensure that there are no conflicts even when both endpoints |
| 112 /// have virtual channels with the same id; since both endpoints can send and | 108 /// have virtual channels with the same id; since both endpoints can send and |
| 113 /// receive messages across each virtual channel, a naïve scheme would make it | 109 /// receive messages across each virtual channel, a naïve scheme would make it |
| 114 /// impossible to tell whether a message was from a channel that originated in | 110 /// impossible to tell whether a message was from a channel that originated in |
| 115 /// the remote endpoint or a reply on a channel that originated in the local | 111 /// the remote endpoint or a reply on a channel that originated in the local |
| 116 /// endpoint. | 112 /// endpoint. |
| 117 /// | 113 /// |
| 118 /// The trick is that each endpoint only uses odd ids for its own channels. | 114 /// The trick is that each endpoint only uses odd ids for its own channels. |
| 119 /// When sending a message over a channel that was created by the remote | 115 /// When sending a message over a channel that was created by the remote |
| 120 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] | 116 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] |
| 121 /// knows that if an incoming message has an odd id, it's using the local id | 117 /// knows that if an incoming message has an odd id, it's using the local id |
| 122 /// scheme, but if it has an even id, it's using the remote id scheme. | 118 /// scheme, but if it has an even id, it's using the remote id scheme. |
| 123 var _nextId = 1; | 119 var _nextId = 1; |
| 124 | 120 |
| 125 _MultiChannel(this._inner) { | 121 _MultiChannel(this._inner) { |
| 126 // The default connection is a special case which has id 0 on both ends. | 122 // The default connection is a special case which has id 0 on both ends. |
| 127 // This allows it to begin connected without having to send over an id. | 123 // This allows it to begin connected without having to send over an id. |
| 128 _streamControllers[0] = _streamController; | 124 _controllers[0] = _mainController; |
| 129 _sinkControllers[0] = _sinkController; | 125 _mainController.local.stream.listen( |
| 130 _sinkController.stream.listen( | |
| 131 (message) => _inner.sink.add([0, message]), | 126 (message) => _inner.sink.add([0, message]), |
| 132 onDone: () => _closeChannel(0, 0)); | 127 onDone: () => _closeChannel(0, 0)); |
| 133 | 128 |
| 134 _innerStreamSubscription = _inner.stream.listen((message) { | 129 _innerStreamSubscription = _inner.stream.listen((message) { |
| 135 var id = message[0]; | 130 var id = message[0]; |
| 136 var sink = _streamControllers[id]; | 131 var controller = _controllers[id]; |
| 137 | 132 |
| 138 // A sink might not exist if the channel was closed before an incoming | 133 // A controller might not exist if the channel was closed before an |
| 139 // message was processed. | 134 // incoming message was processed. |
| 140 if (sink == null) return; | 135 if (controller == null) return; |
| 141 if (message.length > 1) { | 136 if (message.length > 1) { |
| 142 sink.add(message[1]); | 137 controller.local.sink.add(message[1]); |
| 143 return; | 138 return; |
| 144 } | 139 } |
| 145 | 140 |
| 146 // A message without data indicates that the channel has been closed. | 141 // A message without data indicates that the channel has been closed. We |
| 147 _sinkControllers[id].close(); | 142 // can only close the sink here without doing any more cleanup, because |
| 148 }, onDone: _closeInnerChannel, | 143 // the sink closing will cause the stream to emit a done event which will |
| 149 onError: _streamController.addError); | 144 // trigger more cleanup. |
| 145 controller.local.sink.close(); |
| 146 }, |
| 147 onDone: _closeInnerChannel, |
| 148 onError: _mainController.local.sink.addError); |
| 150 } | 149 } |
| 151 | 150 |
| 152 VirtualChannel virtualChannel([id]) { | 151 VirtualChannel virtualChannel([id]) { |
| 153 if (_inner == null) { | |
| 154 throw new StateError("The underlying channel is closed."); | |
| 155 } | |
| 156 | |
| 157 var inputId; | 152 var inputId; |
| 158 var outputId; | 153 var outputId; |
| 159 if (id != null) { | 154 if (id != null) { |
| 160 // Since the user is passing in an id, we're connected to a remote | 155 // Since the user is passing in an id, we're connected to a remote |
| 161 // VirtualChannel. This means messages they send over this channel will | 156 // VirtualChannel. This means messages they send over this channel will |
| 162 // have the original odd id, but our replies will have an even id. | 157 // have the original odd id, but our replies will have an even id. |
| 163 inputId = id; | 158 inputId = id; |
| 164 outputId = (id as int) + 1; | 159 outputId = (id as int) + 1; |
| 165 } else { | 160 } else { |
| 166 // Since we're generating an id, we originated this VirtualChannel. This | 161 // Since we're generating an id, we originated this VirtualChannel. This |
| 167 // means messages we send over this channel will have the original odd id, | 162 // means messages we send over this channel will have the original odd id, |
| 168 // but the remote channel's replies will have an even id. | 163 // but the remote channel's replies will have an even id. |
| 169 inputId = _nextId + 1; | 164 inputId = _nextId + 1; |
| 170 outputId = _nextId; | 165 outputId = _nextId; |
| 171 _nextId += 2; | 166 _nextId += 2; |
| 172 } | 167 } |
| 173 | 168 |
| 174 if (_streamControllers.containsKey(inputId)) { | 169 // If the inner channel has already closed, create new virtual channels in a |
| 170 // closed state. |
| 171 if (_inner == null) { |
| 172 return new VirtualChannel._( |
| 173 this, inputId, new Stream.empty(), new NullStreamSink()); |
| 174 } |
| 175 |
| 176 if (_controllers.containsKey(inputId)) { |
| 175 throw new ArgumentError("A virtual channel with id $id already exists."); | 177 throw new ArgumentError("A virtual channel with id $id already exists."); |
| 176 } | 178 } |
| 177 | 179 |
| 178 var streamController = new StreamController(sync: true); | 180 var controller = new StreamChannelController(sync: true); |
| 179 var sinkController = new StreamController(sync: true); | 181 _controllers[inputId] = controller; |
| 180 _streamControllers[inputId] = streamController; | 182 controller.local.stream.listen( |
| 181 _sinkControllers[inputId] = sinkController; | |
| 182 sinkController.stream.listen( | |
| 183 (message) => _inner.sink.add([outputId, message]), | 183 (message) => _inner.sink.add([outputId, message]), |
| 184 onDone: () => _closeChannel(inputId, outputId)); | 184 onDone: () => _closeChannel(inputId, outputId)); |
| 185 | 185 |
| 186 return new VirtualChannel._( | 186 return new VirtualChannel._( |
| 187 this, outputId, streamController.stream, sinkController.sink); | 187 this, outputId, controller.foreign.stream, controller.foreign.sink); |
| 188 } | 188 } |
| 189 | 189 |
| 190 /// Closes the virtual channel for which incoming messages have [inputId] and | 190 /// Closes the virtual channel for which incoming messages have [inputId] and |
| 191 /// outgoing messages have [outputId]. | 191 /// outgoing messages have [outputId]. |
| 192 void _closeChannel(int inputId, int outputId) { | 192 void _closeChannel(int inputId, int outputId) { |
| 193 _streamControllers.remove(inputId).close(); | 193 var controller = _controllers.remove(inputId); |
| 194 _sinkControllers.remove(inputId).close(); | 194 controller.local.sink.close(); |
| 195 | 195 |
| 196 if (_inner == null) return; | 196 if (_inner == null) return; |
| 197 | 197 |
| 198 // A message without data indicates that the virtual channel has been | 198 // A message without data indicates that the virtual channel has been |
| 199 // closed. | 199 // closed. |
| 200 _inner.sink.add([outputId]); | 200 _inner.sink.add([outputId]); |
| 201 if (_streamControllers.isEmpty) _closeInnerChannel(); | 201 if (_controllers.isEmpty) _closeInnerChannel(); |
| 202 } | 202 } |
| 203 | 203 |
| 204 /// Closes the underlying communication channel. | 204 /// Closes the underlying communication channel. |
| 205 void _closeInnerChannel() { | 205 void _closeInnerChannel() { |
| 206 _inner.sink.close(); | 206 _inner.sink.close(); |
| 207 _innerStreamSubscription.cancel(); | 207 _innerStreamSubscription.cancel(); |
| 208 _inner = null; | 208 _inner = null; |
| 209 for (var controller in _sinkControllers.values.toList()) { | 209 |
| 210 controller.close(); | 210 // Convert this to a list because the close is dispatched synchronously, and |
| 211 // that could conceivably remove a controller from [_controllers]. |
| 212 for (var controller in new List.from(_controllers.values)) { |
| 213 controller.local.sink.close(); |
| 211 } | 214 } |
| 215 _controllers.clear(); |
| 212 } | 216 } |
| 213 } | 217 } |
| 214 | 218 |
| 215 /// A virtual channel created by [MultiChannel]. | 219 /// A virtual channel created by [MultiChannel]. |
| 216 /// | 220 /// |
| 217 /// This implements [MultiChannel] for convenience. | 221 /// This implements [MultiChannel] for convenience. |
| 218 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's | 222 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's |
| 219 /// [MultiChannel.virtualChannel]. | 223 /// [MultiChannel.virtualChannel]. |
| 220 class VirtualChannel extends StreamChannelMixin implements MultiChannel { | 224 class VirtualChannel extends StreamChannelMixin implements MultiChannel { |
| 221 /// The [MultiChannel] that created this. | 225 /// The [MultiChannel] that created this. |
| 222 final MultiChannel _parent; | 226 final MultiChannel _parent; |
| 223 | 227 |
| 224 /// The identifier for this channel. | 228 /// The identifier for this channel. |
| 225 /// | 229 /// |
| 226 /// This can be sent across the [MultiChannel] to provide the remote endpoint | 230 /// This can be sent across the [MultiChannel] to provide the remote endpoint |
| 227 /// a means to connect to this channel. Nothing about this is guaranteed | 231 /// a means to connect to this channel. Nothing about this is guaranteed |
| 228 /// except that it will be JSON-serializable. | 232 /// except that it will be JSON-serializable. |
| 229 final id; | 233 final id; |
| 230 | 234 |
| 231 final Stream stream; | 235 final Stream stream; |
| 232 final StreamSink sink; | 236 final StreamSink sink; |
| 233 | 237 |
| 234 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 238 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
| 235 | 239 |
| 236 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 240 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
| 237 } | 241 } |
| OLD | NEW |