Index: lib/src/util/multi_channel.dart |
diff --git a/lib/src/util/multi_channel.dart b/lib/src/util/multi_channel.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3732a17882e314b77f9cb02c7882c71e663c81c3 |
--- /dev/null |
+++ b/lib/src/util/multi_channel.dart |
@@ -0,0 +1,243 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library unittest.multi_channel; |
+ |
+import 'dart:async'; |
+ |
+/// A class that multiplexes multiple virtual channels across a single |
+/// underlying transport layer. |
+/// |
+/// This should be connected to another [MultiChannel] on the other end of the |
+/// underlying channel. It starts with a single default virtual channel, |
+/// accessible via [stream] and [sink]. Additional virtual channels can be |
+/// created with [virtualChannel]. |
+/// |
+/// When a virtual channel is created by one endpoint, the other must connect to |
+/// it before messages may be sent through it. The first endpoint passes its |
+/// [VirtualChannel.id] to the second, which then creates a channel from that id |
+/// also using [virtualChannel]. For example: |
+/// |
+/// ```dart |
+/// // First endpoint |
+/// var virtual = multiChannel.virtualChannel(); |
+/// multiChannel.sink.add({ |
+/// "channel": virtual.id |
+/// }); |
+/// |
+/// // Second endpoint |
+/// multiChannel.stream.listen((message) { |
+/// var virtual = multiChannel.virtualChannel(message["channel"]); |
+/// // ... |
+/// }); |
+/// ``` |
+/// |
+/// Sending errors across a [MultiChannel] is not supported. Any errors from the |
+/// underlying stream will be reported only via the default |
+/// [MultiChannel.stream]. |
+/// |
+/// Each virtual channel may be closed individually. When all of them are |
+/// closed, the underlying [StreamSink] is closed automatically. |
+abstract class MultiChannel { |
+ /// The default input stream. |
+ /// |
+ /// This connects to the remote [sink]. |
+ Stream get stream; |
+ |
+ /// The default output stream. |
+ /// |
+ /// This connects to the remote [stream]. If this is closed, the remote |
+ /// [stream] will close, but other virtual channels will remain open and new |
+ /// virtual channels may be opened. |
+ StreamSink get sink; |
+ |
+ /// Creates a new [MultiChannel] that sends messages over [innerStream] and |
+ /// [innerSink]. |
+ /// |
+ /// The inner streams must take JSON-like objects. |
+ factory MultiChannel(Stream innerStream, StreamSink innerSink) => |
+ new _MultiChannel(innerStream, innerSink); |
+ |
+ /// Creates a new virtual channel. |
+ /// |
+ /// If [id] is not passed, this creates a virtual channel from scratch. Before |
+ /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint |
+ /// where [virtualChannel] should be called with that id. |
+ /// |
+ /// If [id] is passed, this creates a virtual channel corresponding to the |
+ /// channel with that id on the remote channel. |
+ /// |
+ /// Throws an [ArgumentError] if a virtual channel already exists for [id]. |
+ /// Throws a [StateError] if the underlying channel is closed. |
+ VirtualChannel virtualChannel([id]); |
+} |
+ |
+/// The implementation of [MultiChannel]. |
+/// |
+/// This is private so that [VirtualChannel] can inherit from [MultiChannel] |
+/// without having to implement all the private members. |
+class _MultiChannel implements MultiChannel { |
+ /// The inner stream over which all communication is received. |
+ /// |
+ /// This will be `null` if the underlying communication channel is closed. |
+ Stream _innerStream; |
+ |
+ /// The inner sink over which all communication is sent. |
+ /// |
+ /// This will be `null` if the underlying communication channel is closed. |
+ StreamSink _innerSink; |
+ |
+ /// The subscription to [_innerStream]. |
+ StreamSubscription _innerStreamSubscription; |
+ |
+ Stream get stream => _streamController.stream; |
+ final _streamController = new StreamController(sync: true); |
+ |
+ StreamSink get sink => _sinkController.sink; |
+ final _sinkController = new StreamController(sync: true); |
+ |
+ /// A map from virtual channel ids to [StreamController]s that should be used |
+ /// to write messages received from those channels. |
+ final _streamControllers = new Map<int, StreamController>(); |
+ |
+ /// A map from virtual channel ids to [StreamControllers]s that are used |
+ /// to receive messages to write to those channels. |
+ /// |
+ /// Note that this uses the same keys as [_streamControllers]. |
+ final _sinkControllers = new Map<int, StreamController>(); |
+ |
+ /// The next id to use for a local virtual channel. |
+ /// |
+ /// Ids are used to identify virtual channels. Each message is tagged with an |
+ /// id; the receiving [MultiChannel] uses this id to look up which |
+ /// [VirtualChannel] the message should be dispatched to. |
+ /// |
+ /// The id scheme for virtual channels is somewhat complicated. This is |
+ /// necessary to ensure that there are no conflicts even when both endpoints |
+ /// have virtual channels with the same id; since both endpoints can send and |
+ /// receive messages across each virtual channel, a naïve scheme would make it |
+ /// impossible to tell whether a message was from a channel that originated in |
+ /// the remote endpoint or a reply on a channel that originated in the local |
+ /// endpoint. |
+ /// |
+ /// The trick is that each endpoint only uses odd ids for its own channels. |
+ /// When sending a message over a channel that was created by the remote |
+ /// endpoint, the channel's id plus one is used. This way each [MultiChannel] |
+ /// knows that if an incoming message has an odd id, it's using the local id |
+ /// scheme, but if it has an even id, it's using the remote id scheme. |
+ var _nextId = 1; |
+ |
+ _MultiChannel(this._innerStream, this._innerSink) { |
+ // The default connection is a special case which has id 0 on both ends. |
+ // This allows it to begin connected without having to send over an id. |
+ _streamControllers[0] = _streamController; |
+ _sinkControllers[0] = _sinkController; |
+ _sinkController.stream.listen( |
+ (message) => _innerSink.add([0, message]), |
+ onDone: () => _closeChannel(0, 0)); |
+ |
+ _innerStreamSubscription = _innerStream.listen((message) { |
+ var id = message[0]; |
+ var sink = _streamControllers[id]; |
+ |
+ // A sink might not exist if the channel was closed before an incoming |
+ // message was processed. |
+ if (sink == null) return; |
+ if (message.length > 1) { |
+ sink.add(message[1]); |
+ return; |
+ } |
+ |
+ // A message without data indicates that the channel has been closed. |
+ _sinkControllers[id].close(); |
+ }, onDone: _closeInnerChannel, |
+ onError: _streamController.addError); |
+ } |
+ |
+ VirtualChannel virtualChannel([id]) { |
+ if (_innerStream == null) { |
+ throw new StateError("The underlying channel is closed."); |
+ } |
+ |
+ var inputId; |
+ var outputId; |
+ if (id != null) { |
+ // Since the user is passing in an id, we're connected to a remote |
+ // VirtualChannel. This means messages they send over this channel will |
+ // have the original odd id, but our replies will have an even id. |
+ inputId = id; |
+ outputId = (id as int) + 1; |
+ } else { |
+ // Since we're generating an id, we originated this VirtualChannel. This |
+ // means messages we send over this channel will have the original odd id, |
+ // but the remote channel's replies will have an even id. |
+ inputId = _nextId + 1; |
+ outputId = _nextId; |
+ _nextId += 2; |
+ } |
+ |
+ if (_streamControllers.containsKey(inputId)) { |
+ throw new ArgumentError("A virtual channel with id $id already exists."); |
+ } |
+ |
+ var streamController = new StreamController(sync: true); |
+ var sinkController = new StreamController(sync: true); |
+ _streamControllers[inputId] = streamController; |
+ _sinkControllers[inputId] = sinkController; |
+ sinkController.stream.listen( |
+ (message) => _innerSink.add([outputId, message]), |
+ onDone: () => _closeChannel(inputId, outputId)); |
+ |
+ return new VirtualChannel._( |
+ this, outputId, streamController.stream, sinkController.sink); |
+ } |
+ |
+ /// Closes the virtual channel for which incoming messages have [inputId] and |
+ /// outgoing messages have [outputId]. |
+ void _closeChannel(int inputId, int outputId) { |
+ // A message without data indicates that the virtual channel has been |
+ // closed. |
+ _streamControllers.remove(inputId).close(); |
+ _sinkControllers.remove(inputId).close(); |
+ |
+ if (_innerSink == null) return; |
+ _innerSink.add([outputId]); |
+ if (_streamControllers.isEmpty) _closeInnerChannel(); |
+ } |
+ |
+ /// Closes the underlying communication channel. |
+ void _closeInnerChannel() { |
+ _innerSink.close(); |
+ _innerStreamSubscription.cancel(); |
+ _innerStream = null; |
+ _innerSink = null; |
+ for (var controller in _sinkControllers.values.toList()) { |
+ controller.close(); |
+ } |
+ } |
+} |
+ |
+/// A virtual channel created by [MultiChannel]. |
+/// |
+/// This implements [MultiChannel] for convenience. |
+/// [VirtualChannel.virtualChannel] is semantically identical to the parent's |
+/// [MultiChannel.virtualChannel]. |
+class VirtualChannel implements MultiChannel { |
+ /// The [MultiChannel] that created this. |
+ final MultiChannel _parent; |
+ |
+ /// The identifier for this channel. |
+ /// |
+ /// This can be sent across the [MultiChannel] to provide the remote endpoint |
+ /// a means to connect to this channel. Nothing about this is guaranteed |
+ /// except that it will be JSON-serializable. |
+ final id; |
+ |
+ final Stream stream; |
+ final StreamSink sink; |
+ |
+ VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
+ |
+ VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
+} |