OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library unittest.multi_channel; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 /// A class that multiplexes multiple virtual channels across a single | |
10 /// underlying transport layer. | |
11 /// | |
12 /// This should be connected to another [MultiChannel] on the other end of the | |
13 /// underlying channel. It starts with a single default virtual channel, | |
14 /// accessible via [stream] and [sink]. Additional virtual channels can be | |
15 /// created with [virtualChannel]. | |
16 /// | |
17 /// When a virtual channel is created by one endpoint, the other must connect to | |
18 /// it before messages may be sent through it. The first endpoint passes its | |
19 /// [VirtualChannel.id] to the second, which then creates a channel from that id | |
20 /// also using [virtualChannel]. For example: | |
21 /// | |
22 /// ```dart | |
23 /// // First endpoint | |
24 /// var virtual = multiChannel.virtualChannel(); | |
25 /// multiChannel.sink.add({ | |
26 /// "channel": virtual.id | |
27 /// }); | |
28 /// | |
29 /// // Second endpoint | |
30 /// multiChannel.stream.listen((message) { | |
31 /// var virtual = multiChannel.virtualChannel(message["channel"]); | |
32 /// // ... | |
33 /// }); | |
34 /// ``` | |
35 /// | |
36 /// Sending errors across a [MultiChannel] is not supported. Any errors from the | |
37 /// underlying stream will be reported only via the default | |
38 /// [MultiChannel.stream]. | |
39 /// | |
40 /// Each virtual channel may be closed individually. When all of them are | |
41 /// closed, the underlying [StreamSink] is closed automatically. | |
42 abstract class MultiChannel { | |
43 /// The default input stream. | |
44 /// | |
45 /// This connects to the remote [sink]. | |
46 Stream get stream; | |
47 | |
48 /// The default output stream. | |
49 /// | |
50 /// This connects to the remote [stream]. If this is closed, the remote [strea m] | |
kevmoo
2015/02/25 23:43:31
long line
nweiz
2015/02/26 00:16:09
Done.
| |
51 /// will close, but other virtual channels will remain open and new virtual | |
52 /// channels may be opened. | |
53 StreamSink get sink; | |
54 | |
55 /// Creates a new [MultiChannel] that sends messages over [innerStream] and | |
56 /// [innerSink]. | |
57 /// | |
58 /// The inner streams must take JSON-like objects. | |
59 factory MultiChannel(Stream innerStream, StreamSink innerSink) => | |
60 new _MultiChannel(innerStream, innerSink); | |
61 | |
62 /// Creates a new virtual channel. | |
63 /// | |
64 /// If [id] is not passed, this creates a virtual channel from scratch. Before | |
65 /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint | |
66 /// where [virtualChannel] should be called with that id. | |
67 /// | |
68 /// If [id] is passed, this creates a virtual channel corresponding to the | |
69 /// channel with that id on the remote channel. | |
70 /// | |
71 /// Throws an [ArgumentError] if a virtual channel already exists for [id]. | |
72 /// Throws a [StateError] if the underlying channel is closed. | |
73 VirtualChannel virtualChannel([id]); | |
74 } | |
75 | |
76 /// The implementation of [MultiChannel]. | |
77 /// | |
78 /// This is private so that [VirtualChannel] can inherit from [MultiChannel] | |
79 /// without having to implement all the private members. | |
80 class _MultiChannel implements MultiChannel { | |
81 /// The inner stream over which all communication is received. | |
82 /// | |
83 /// This will be `null` if the underlying communication channel is closed. | |
84 Stream _innerStream; | |
85 | |
86 /// The inner sink over which all communication is sent. | |
87 /// | |
88 /// This will be `null` if the underlying communication channel is closed. | |
89 StreamSink _innerSink; | |
90 | |
91 /// The subscription to [_innerStream]. | |
92 StreamSubscription _innerStreamSubscription; | |
93 | |
94 Stream get stream => _streamController.stream; | |
95 final _streamController = new StreamController(sync: true); | |
96 | |
97 StreamSink get sink => _sinkController.sink; | |
98 final _sinkController = new StreamController(sync: true); | |
99 | |
100 /// A map from virtual channel ids to [StreamController]s that should be used | |
101 /// to write messages received from those channels. | |
102 final _streamControllers = new Map<int, StreamController>(); | |
103 | |
104 /// A map from virtual channel ids to [StreamControllers]s that are used | |
105 /// to receive messages to write to those channels. | |
106 /// | |
107 /// Note that this uses the same keys as [_streamControllers]. | |
108 final _sinkControllers = new Map<int, StreamController>(); | |
109 | |
110 /// The next id to use for a local virtual channel. | |
111 /// | |
112 /// Ids are used to identify virtual channels. Each message is tagged with an | |
113 /// id; the receiving [MultiChannel] uses this id to look up which | |
114 /// [VirtualChannel] the message should be dispatched to. | |
115 /// | |
116 /// The id scheme for virtual channels is somewhat complicated. This is | |
117 /// necessary to ensure that there are no conflicts even when both endpoints | |
118 /// have virtual channels with the same id; since both endpoints can send and | |
119 /// receive messages across each virtual channel, a naïve scheme would make it | |
120 /// impossible to tell whether a message was from a channel that originated in | |
121 /// the remote endpoint or a reply on a channel that originated in the local | |
122 /// endpoint. | |
123 /// | |
124 /// The trick is that each endpoint only uses odd ids for its own channels. | |
125 /// When sending a message over a channel that was created by the remote | |
126 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] | |
127 /// knows that if an incoming message has an odd id, it's using the local id | |
128 /// scheme, but if it has an even id, it's using the remote id scheme. | |
129 var _nextId = 1; | |
130 | |
131 _MultiChannel(this._innerStream, this._innerSink) { | |
132 // The default connection is a special case which has id 0 on both ends. | |
133 // This allows it to begin connected without having to send over an id. | |
134 _streamControllers[0] = _streamController; | |
135 _sinkControllers[0] = _sinkController; | |
136 _sinkController.stream.listen( | |
137 (message) => _innerSink.add([0, message]), | |
138 onDone: () => _closeChannel(0, 0)); | |
139 | |
140 _innerStreamSubscription = _innerStream.listen((message) { | |
141 var id = message[0]; | |
142 var sink = _streamControllers[id]; | |
143 | |
144 // A sink might not exist if the channel was closed before an incoming | |
145 // message was processed. | |
146 if (sink == null) return; | |
147 if (message.length > 1) { | |
148 sink.add(message[1]); | |
149 return; | |
150 } | |
151 | |
152 // A message without data indicates that the channel has been closed. | |
153 _sinkControllers[id].close(); | |
154 }, onDone: _closeInnerChannel, | |
155 onError: _streamController.addError); | |
156 } | |
157 | |
158 VirtualChannel virtualChannel([id]) { | |
159 if (_innerStream == null) { | |
160 throw new StateError("The underlying channel is closed."); | |
161 } | |
162 | |
163 var inputId; | |
164 var outputId; | |
165 if (id != null) { | |
166 // Since the user is passing in an id, we're connected to a remote | |
167 // VirtualChannel. This means messages they send over this channel will | |
168 // have the original odd id, but our replies will have an even id. | |
169 inputId = id; | |
170 outputId = (id as int) + 1; | |
171 } else { | |
172 // Since we're generating an id, we originated this VirtualChannel. This | |
173 // means messages we send over this channel will have the original odd id, | |
174 // but the remote channel's replies will have an even id. | |
175 inputId = _nextId + 1; | |
176 outputId = _nextId; | |
177 _nextId += 2; | |
178 } | |
179 | |
180 if (_streamControllers.containsKey(inputId)) { | |
181 throw new ArgumentError("A virtual channel with id $id already exists."); | |
182 } | |
183 | |
184 var streamController = new StreamController(sync: true); | |
185 var sinkController = new StreamController(sync: true); | |
186 _streamControllers[inputId] = streamController; | |
187 _sinkControllers[inputId] = sinkController; | |
188 sinkController.stream.listen( | |
189 (message) => _innerSink.add([outputId, message]), | |
190 onDone: () => _closeChannel(inputId, outputId)); | |
191 | |
192 return new VirtualChannel._( | |
193 this, outputId, streamController.stream, sinkController.sink); | |
194 } | |
195 | |
196 /// Closes the virtual channel for which incoming messages have [inputId] and | |
197 /// outgoing messages have [outputId]. | |
198 void _closeChannel(int inputId, int outputId) { | |
199 // A message without data indicates that the virtual channel has been | |
200 // closed. | |
201 _streamControllers.remove(inputId).close(); | |
202 _sinkControllers.remove(inputId).close(); | |
203 | |
204 if (_innerSink == null) return; | |
205 _innerSink.add([outputId]); | |
206 if (_streamControllers.isEmpty) _closeInnerChannel(); | |
207 } | |
208 | |
209 /// Closes the underlying communication channel. | |
210 void _closeInnerChannel() { | |
211 _innerSink.close(); | |
212 _innerStreamSubscription.cancel(); | |
213 _innerStream = null; | |
214 _innerSink = null; | |
215 for (var controller in _sinkControllers.values.toList()) { | |
216 controller.close(); | |
217 } | |
218 } | |
219 } | |
220 | |
221 /// A virtual channel created by [MultiChannel]. | |
222 /// | |
223 /// This implements [MultiChannel] for convenience. | |
224 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's | |
225 /// [MultiChannel.virtualChannel]. | |
226 class VirtualChannel implements MultiChannel { | |
227 /// The [MultiChannel] that created this. | |
228 final MultiChannel _parent; | |
229 | |
230 /// The identifier for this channel. | |
231 /// | |
232 /// This can be sent across the [MultiChannel] to provide the remote endpoint | |
233 /// a means to connect to this channel. Nothing about this is guaranteed | |
234 /// except that it will be JSON-serializable. | |
235 final id; | |
236 | |
237 final Stream stream; | |
238 final StreamSink sink; | |
239 | |
240 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | |
241 | |
242 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | |
243 } | |
OLD | NEW |