Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: mojo/public/dart/third_party/test/lib/src/util/multi_channel.dart

Issue 1346773002: Stop running pub get at gclient sync time and fix build bugs (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library test.multi_channel;
6
7 import 'dart:async';
8
9 import 'stream_channel.dart';
10
11 /// A class that multiplexes multiple virtual channels across a single
12 /// underlying transport layer.
13 ///
14 /// This should be connected to another [MultiChannel] on the other end of the
15 /// underlying channel. It starts with a single default virtual channel,
16 /// accessible via [stream] and [sink]. Additional virtual channels can be
17 /// created with [virtualChannel].
18 ///
19 /// When a virtual channel is created by one endpoint, the other must connect to
20 /// it before messages may be sent through it. The first endpoint passes its
21 /// [VirtualChannel.id] to the second, which then creates a channel from that id
22 /// also using [virtualChannel]. For example:
23 ///
24 /// ```dart
25 /// // First endpoint
26 /// var virtual = multiChannel.virtualChannel();
27 /// multiChannel.sink.add({
28 /// "channel": virtual.id
29 /// });
30 ///
31 /// // Second endpoint
32 /// multiChannel.stream.listen((message) {
33 /// var virtual = multiChannel.virtualChannel(message["channel"]);
34 /// // ...
35 /// });
36 /// ```
37 ///
38 /// Sending errors across a [MultiChannel] is not supported. Any errors from the
39 /// underlying stream will be reported only via the default
40 /// [MultiChannel.stream].
41 ///
42 /// Each virtual channel may be closed individually. When all of them are
43 /// closed, the underlying [StreamSink] is closed automatically.
44 abstract class MultiChannel implements StreamChannel {
45 /// The default input stream.
46 ///
47 /// This connects to the remote [sink].
48 Stream get stream;
49
50 /// The default output stream.
51 ///
52 /// This connects to the remote [stream]. If this is closed, the remote
53 /// [stream] will close, but other virtual channels will remain open and new
54 /// virtual channels may be opened.
55 StreamSink get sink;
56
57 /// Creates a new [MultiChannel] that sends messages over [innerStream] and
58 /// [innerSink].
59 ///
60 /// The inner streams must take JSON-like objects.
61 factory MultiChannel(Stream innerStream, StreamSink innerSink) =>
62 new _MultiChannel(innerStream, innerSink);
63
64 /// Creates a new virtual channel.
65 ///
66 /// If [id] is not passed, this creates a virtual channel from scratch. Before
67 /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint
68 /// where [virtualChannel] should be called with that id.
69 ///
70 /// If [id] is passed, this creates a virtual channel corresponding to the
71 /// channel with that id on the remote channel.
72 ///
73 /// Throws an [ArgumentError] if a virtual channel already exists for [id].
74 /// Throws a [StateError] if the underlying channel is closed.
75 VirtualChannel virtualChannel([id]);
76 }
77
78 /// The implementation of [MultiChannel].
79 ///
80 /// This is private so that [VirtualChannel] can inherit from [MultiChannel]
81 /// without having to implement all the private members.
82 class _MultiChannel extends StreamChannelMixin implements MultiChannel {
83 /// The inner stream over which all communication is received.
84 ///
85 /// This will be `null` if the underlying communication channel is closed.
86 Stream _innerStream;
87
88 /// The inner sink over which all communication is sent.
89 ///
90 /// This will be `null` if the underlying communication channel is closed.
91 StreamSink _innerSink;
92
93 /// The subscription to [_innerStream].
94 StreamSubscription _innerStreamSubscription;
95
96 Stream get stream => _streamController.stream;
97 final _streamController = new StreamController(sync: true);
98
99 StreamSink get sink => _sinkController.sink;
100 final _sinkController = new StreamController(sync: true);
101
102 /// A map from virtual channel ids to [StreamController]s that should be used
103 /// to write messages received from those channels.
104 final _streamControllers = new Map<int, StreamController>();
105
106 /// A map from virtual channel ids to [StreamControllers]s that are used
107 /// to receive messages to write to those channels.
108 ///
109 /// Note that this uses the same keys as [_streamControllers].
110 final _sinkControllers = new Map<int, StreamController>();
111
112 /// The next id to use for a local virtual channel.
113 ///
114 /// Ids are used to identify virtual channels. Each message is tagged with an
115 /// id; the receiving [MultiChannel] uses this id to look up which
116 /// [VirtualChannel] the message should be dispatched to.
117 ///
118 /// The id scheme for virtual channels is somewhat complicated. This is
119 /// necessary to ensure that there are no conflicts even when both endpoints
120 /// have virtual channels with the same id; since both endpoints can send and
121 /// receive messages across each virtual channel, a naïve scheme would make it
122 /// impossible to tell whether a message was from a channel that originated in
123 /// the remote endpoint or a reply on a channel that originated in the local
124 /// endpoint.
125 ///
126 /// The trick is that each endpoint only uses odd ids for its own channels.
127 /// When sending a message over a channel that was created by the remote
128 /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
129 /// knows that if an incoming message has an odd id, it's using the local id
130 /// scheme, but if it has an even id, it's using the remote id scheme.
131 var _nextId = 1;
132
133 _MultiChannel(this._innerStream, this._innerSink) {
134 // The default connection is a special case which has id 0 on both ends.
135 // This allows it to begin connected without having to send over an id.
136 _streamControllers[0] = _streamController;
137 _sinkControllers[0] = _sinkController;
138 _sinkController.stream.listen(
139 (message) => _innerSink.add([0, message]),
140 onDone: () => _closeChannel(0, 0));
141
142 _innerStreamSubscription = _innerStream.listen((message) {
143 var id = message[0];
144 var sink = _streamControllers[id];
145
146 // A sink might not exist if the channel was closed before an incoming
147 // message was processed.
148 if (sink == null) return;
149 if (message.length > 1) {
150 sink.add(message[1]);
151 return;
152 }
153
154 // A message without data indicates that the channel has been closed.
155 _sinkControllers[id].close();
156 }, onDone: _closeInnerChannel,
157 onError: _streamController.addError);
158 }
159
160 VirtualChannel virtualChannel([id]) {
161 if (_innerStream == null) {
162 throw new StateError("The underlying channel is closed.");
163 }
164
165 var inputId;
166 var outputId;
167 if (id != null) {
168 // Since the user is passing in an id, we're connected to a remote
169 // VirtualChannel. This means messages they send over this channel will
170 // have the original odd id, but our replies will have an even id.
171 inputId = id;
172 outputId = (id as int) + 1;
173 } else {
174 // Since we're generating an id, we originated this VirtualChannel. This
175 // means messages we send over this channel will have the original odd id,
176 // but the remote channel's replies will have an even id.
177 inputId = _nextId + 1;
178 outputId = _nextId;
179 _nextId += 2;
180 }
181
182 if (_streamControllers.containsKey(inputId)) {
183 throw new ArgumentError("A virtual channel with id $id already exists.");
184 }
185
186 var streamController = new StreamController(sync: true);
187 var sinkController = new StreamController(sync: true);
188 _streamControllers[inputId] = streamController;
189 _sinkControllers[inputId] = sinkController;
190 sinkController.stream.listen(
191 (message) => _innerSink.add([outputId, message]),
192 onDone: () => _closeChannel(inputId, outputId));
193
194 return new VirtualChannel._(
195 this, outputId, streamController.stream, sinkController.sink);
196 }
197
198 /// Closes the virtual channel for which incoming messages have [inputId] and
199 /// outgoing messages have [outputId].
200 void _closeChannel(int inputId, int outputId) {
201 // A message without data indicates that the virtual channel has been
202 // closed.
203 _streamControllers.remove(inputId).close();
204 _sinkControllers.remove(inputId).close();
205
206 if (_innerSink == null) return;
207 _innerSink.add([outputId]);
208 if (_streamControllers.isEmpty) _closeInnerChannel();
209 }
210
211 /// Closes the underlying communication channel.
212 void _closeInnerChannel() {
213 _innerSink.close();
214 _innerStreamSubscription.cancel();
215 _innerStream = null;
216 _innerSink = null;
217 for (var controller in _sinkControllers.values.toList()) {
218 controller.close();
219 }
220 }
221 }
222
223 /// A virtual channel created by [MultiChannel].
224 ///
225 /// This implements [MultiChannel] for convenience.
226 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's
227 /// [MultiChannel.virtualChannel].
228 class VirtualChannel extends StreamChannelMixin implements MultiChannel {
229 /// The [MultiChannel] that created this.
230 final MultiChannel _parent;
231
232 /// The identifier for this channel.
233 ///
234 /// This can be sent across the [MultiChannel] to provide the remote endpoint
235 /// a means to connect to this channel. Nothing about this is guaranteed
236 /// except that it will be JSON-serializable.
237 final id;
238
239 final Stream stream;
240 final StreamSink sink;
241
242 VirtualChannel._(this._parent, this.id, this.stream, this.sink);
243
244 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
245 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698