OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library test.multi_channel; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 import 'stream_channel.dart'; |
| 10 |
| 11 /// A class that multiplexes multiple virtual channels across a single |
| 12 /// underlying transport layer. |
| 13 /// |
| 14 /// This should be connected to another [MultiChannel] on the other end of the |
| 15 /// underlying channel. It starts with a single default virtual channel, |
| 16 /// accessible via [stream] and [sink]. Additional virtual channels can be |
| 17 /// created with [virtualChannel]. |
| 18 /// |
| 19 /// When a virtual channel is created by one endpoint, the other must connect to |
| 20 /// it before messages may be sent through it. The first endpoint passes its |
| 21 /// [VirtualChannel.id] to the second, which then creates a channel from that id |
| 22 /// also using [virtualChannel]. For example: |
| 23 /// |
| 24 /// ```dart |
| 25 /// // First endpoint |
| 26 /// var virtual = multiChannel.virtualChannel(); |
| 27 /// multiChannel.sink.add({ |
| 28 /// "channel": virtual.id |
| 29 /// }); |
| 30 /// |
| 31 /// // Second endpoint |
| 32 /// multiChannel.stream.listen((message) { |
| 33 /// var virtual = multiChannel.virtualChannel(message["channel"]); |
| 34 /// // ... |
| 35 /// }); |
| 36 /// ``` |
| 37 /// |
| 38 /// Sending errors across a [MultiChannel] is not supported. Any errors from the |
| 39 /// underlying stream will be reported only via the default |
| 40 /// [MultiChannel.stream]. |
| 41 /// |
| 42 /// Each virtual channel may be closed individually. When all of them are |
| 43 /// closed, the underlying [StreamSink] is closed automatically. |
| 44 abstract class MultiChannel implements StreamChannel { |
| 45 /// The default input stream. |
| 46 /// |
| 47 /// This connects to the remote [sink]. |
| 48 Stream get stream; |
| 49 |
| 50 /// The default output stream. |
| 51 /// |
| 52 /// This connects to the remote [stream]. If this is closed, the remote |
| 53 /// [stream] will close, but other virtual channels will remain open and new |
| 54 /// virtual channels may be opened. |
| 55 StreamSink get sink; |
| 56 |
| 57 /// Creates a new [MultiChannel] that sends messages over [innerStream] and |
| 58 /// [innerSink]. |
| 59 /// |
| 60 /// The inner streams must take JSON-like objects. |
| 61 factory MultiChannel(Stream innerStream, StreamSink innerSink) => |
| 62 new _MultiChannel(innerStream, innerSink); |
| 63 |
| 64 /// Creates a new virtual channel. |
| 65 /// |
| 66 /// If [id] is not passed, this creates a virtual channel from scratch. Before |
| 67 /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint |
| 68 /// where [virtualChannel] should be called with that id. |
| 69 /// |
| 70 /// If [id] is passed, this creates a virtual channel corresponding to the |
| 71 /// channel with that id on the remote channel. |
| 72 /// |
| 73 /// Throws an [ArgumentError] if a virtual channel already exists for [id]. |
| 74 /// Throws a [StateError] if the underlying channel is closed. |
| 75 VirtualChannel virtualChannel([id]); |
| 76 } |
| 77 |
| 78 /// The implementation of [MultiChannel]. |
| 79 /// |
| 80 /// This is private so that [VirtualChannel] can inherit from [MultiChannel] |
| 81 /// without having to implement all the private members. |
| 82 class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
| 83 /// The inner stream over which all communication is received. |
| 84 /// |
| 85 /// This will be `null` if the underlying communication channel is closed. |
| 86 Stream _innerStream; |
| 87 |
| 88 /// The inner sink over which all communication is sent. |
| 89 /// |
| 90 /// This will be `null` if the underlying communication channel is closed. |
| 91 StreamSink _innerSink; |
| 92 |
| 93 /// The subscription to [_innerStream]. |
| 94 StreamSubscription _innerStreamSubscription; |
| 95 |
| 96 Stream get stream => _streamController.stream; |
| 97 final _streamController = new StreamController(sync: true); |
| 98 |
| 99 StreamSink get sink => _sinkController.sink; |
| 100 final _sinkController = new StreamController(sync: true); |
| 101 |
| 102 /// A map from virtual channel ids to [StreamController]s that should be used |
| 103 /// to write messages received from those channels. |
| 104 final _streamControllers = new Map<int, StreamController>(); |
| 105 |
| 106 /// A map from virtual channel ids to [StreamControllers]s that are used |
| 107 /// to receive messages to write to those channels. |
| 108 /// |
| 109 /// Note that this uses the same keys as [_streamControllers]. |
| 110 final _sinkControllers = new Map<int, StreamController>(); |
| 111 |
| 112 /// The next id to use for a local virtual channel. |
| 113 /// |
| 114 /// Ids are used to identify virtual channels. Each message is tagged with an |
| 115 /// id; the receiving [MultiChannel] uses this id to look up which |
| 116 /// [VirtualChannel] the message should be dispatched to. |
| 117 /// |
| 118 /// The id scheme for virtual channels is somewhat complicated. This is |
| 119 /// necessary to ensure that there are no conflicts even when both endpoints |
| 120 /// have virtual channels with the same id; since both endpoints can send and |
| 121 /// receive messages across each virtual channel, a naïve scheme would make it |
| 122 /// impossible to tell whether a message was from a channel that originated in |
| 123 /// the remote endpoint or a reply on a channel that originated in the local |
| 124 /// endpoint. |
| 125 /// |
| 126 /// The trick is that each endpoint only uses odd ids for its own channels. |
| 127 /// When sending a message over a channel that was created by the remote |
| 128 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] |
| 129 /// knows that if an incoming message has an odd id, it's using the local id |
| 130 /// scheme, but if it has an even id, it's using the remote id scheme. |
| 131 var _nextId = 1; |
| 132 |
| 133 _MultiChannel(this._innerStream, this._innerSink) { |
| 134 // The default connection is a special case which has id 0 on both ends. |
| 135 // This allows it to begin connected without having to send over an id. |
| 136 _streamControllers[0] = _streamController; |
| 137 _sinkControllers[0] = _sinkController; |
| 138 _sinkController.stream.listen( |
| 139 (message) => _innerSink.add([0, message]), |
| 140 onDone: () => _closeChannel(0, 0)); |
| 141 |
| 142 _innerStreamSubscription = _innerStream.listen((message) { |
| 143 var id = message[0]; |
| 144 var sink = _streamControllers[id]; |
| 145 |
| 146 // A sink might not exist if the channel was closed before an incoming |
| 147 // message was processed. |
| 148 if (sink == null) return; |
| 149 if (message.length > 1) { |
| 150 sink.add(message[1]); |
| 151 return; |
| 152 } |
| 153 |
| 154 // A message without data indicates that the channel has been closed. |
| 155 _sinkControllers[id].close(); |
| 156 }, onDone: _closeInnerChannel, |
| 157 onError: _streamController.addError); |
| 158 } |
| 159 |
| 160 VirtualChannel virtualChannel([id]) { |
| 161 if (_innerStream == null) { |
| 162 throw new StateError("The underlying channel is closed."); |
| 163 } |
| 164 |
| 165 var inputId; |
| 166 var outputId; |
| 167 if (id != null) { |
| 168 // Since the user is passing in an id, we're connected to a remote |
| 169 // VirtualChannel. This means messages they send over this channel will |
| 170 // have the original odd id, but our replies will have an even id. |
| 171 inputId = id; |
| 172 outputId = (id as int) + 1; |
| 173 } else { |
| 174 // Since we're generating an id, we originated this VirtualChannel. This |
| 175 // means messages we send over this channel will have the original odd id, |
| 176 // but the remote channel's replies will have an even id. |
| 177 inputId = _nextId + 1; |
| 178 outputId = _nextId; |
| 179 _nextId += 2; |
| 180 } |
| 181 |
| 182 if (_streamControllers.containsKey(inputId)) { |
| 183 throw new ArgumentError("A virtual channel with id $id already exists."); |
| 184 } |
| 185 |
| 186 var streamController = new StreamController(sync: true); |
| 187 var sinkController = new StreamController(sync: true); |
| 188 _streamControllers[inputId] = streamController; |
| 189 _sinkControllers[inputId] = sinkController; |
| 190 sinkController.stream.listen( |
| 191 (message) => _innerSink.add([outputId, message]), |
| 192 onDone: () => _closeChannel(inputId, outputId)); |
| 193 |
| 194 return new VirtualChannel._( |
| 195 this, outputId, streamController.stream, sinkController.sink); |
| 196 } |
| 197 |
| 198 /// Closes the virtual channel for which incoming messages have [inputId] and |
| 199 /// outgoing messages have [outputId]. |
| 200 void _closeChannel(int inputId, int outputId) { |
| 201 // A message without data indicates that the virtual channel has been |
| 202 // closed. |
| 203 _streamControllers.remove(inputId).close(); |
| 204 _sinkControllers.remove(inputId).close(); |
| 205 |
| 206 if (_innerSink == null) return; |
| 207 _innerSink.add([outputId]); |
| 208 if (_streamControllers.isEmpty) _closeInnerChannel(); |
| 209 } |
| 210 |
| 211 /// Closes the underlying communication channel. |
| 212 void _closeInnerChannel() { |
| 213 _innerSink.close(); |
| 214 _innerStreamSubscription.cancel(); |
| 215 _innerStream = null; |
| 216 _innerSink = null; |
| 217 for (var controller in _sinkControllers.values.toList()) { |
| 218 controller.close(); |
| 219 } |
| 220 } |
| 221 } |
| 222 |
| 223 /// A virtual channel created by [MultiChannel]. |
| 224 /// |
| 225 /// This implements [MultiChannel] for convenience. |
| 226 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's |
| 227 /// [MultiChannel.virtualChannel]. |
| 228 class VirtualChannel extends StreamChannelMixin implements MultiChannel { |
| 229 /// The [MultiChannel] that created this. |
| 230 final MultiChannel _parent; |
| 231 |
| 232 /// The identifier for this channel. |
| 233 /// |
| 234 /// This can be sent across the [MultiChannel] to provide the remote endpoint |
| 235 /// a means to connect to this channel. Nothing about this is guaranteed |
| 236 /// except that it will be JSON-serializable. |
| 237 final id; |
| 238 |
| 239 final Stream stream; |
| 240 final StreamSink sink; |
| 241 |
| 242 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
| 243 |
| 244 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
| 245 } |
OLD | NEW |