OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import '../stream_channel.dart'; | 7 import '../stream_channel.dart'; |
8 | 8 |
9 /// A class that multiplexes multiple virtual channels across a single | 9 /// A class that multiplexes multiple virtual channels across a single |
10 /// underlying transport layer. | 10 /// underlying transport layer. |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
45 /// This connects to the remote [sink]. | 45 /// This connects to the remote [sink]. |
46 Stream get stream; | 46 Stream get stream; |
47 | 47 |
48 /// The default output stream. | 48 /// The default output stream. |
49 /// | 49 /// |
50 /// This connects to the remote [stream]. If this is closed, the remote | 50 /// This connects to the remote [stream]. If this is closed, the remote |
51 /// [stream] will close, but other virtual channels will remain open and new | 51 /// [stream] will close, but other virtual channels will remain open and new |
52 /// virtual channels may be opened. | 52 /// virtual channels may be opened. |
53 StreamSink get sink; | 53 StreamSink get sink; |
54 | 54 |
55 /// Creates a new [MultiChannel] that sends messages over [innerStream] and | 55 /// Creates a new [MultiChannel] that sends and receives messages over |
56 /// [innerSink]. | 56 /// [inner]. |
57 /// | 57 /// |
58 /// The inner streams must take JSON-like objects. | 58 /// The inner channel must take JSON-like objects. |
59 factory MultiChannel(Stream innerStream, StreamSink innerSink) => | 59 factory MultiChannel(StreamChannel inner) => new _MultiChannel(inner); |
60 new _MultiChannel(innerStream, innerSink); | |
61 | 60 |
62 /// Creates a new virtual channel. | 61 /// Creates a new virtual channel. |
63 /// | 62 /// |
64 /// If [id] is not passed, this creates a virtual channel from scratch. Before | 63 /// If [id] is not passed, this creates a virtual channel from scratch. Before |
65 /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint | 64 /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint |
66 /// where [virtualChannel] should be called with that id. | 65 /// where [virtualChannel] should be called with that id. |
67 /// | 66 /// |
68 /// If [id] is passed, this creates a virtual channel corresponding to the | 67 /// If [id] is passed, this creates a virtual channel corresponding to the |
69 /// channel with that id on the remote channel. | 68 /// channel with that id on the remote channel. |
70 /// | 69 /// |
71 /// Throws an [ArgumentError] if a virtual channel already exists for [id]. | 70 /// Throws an [ArgumentError] if a virtual channel already exists for [id]. |
72 /// Throws a [StateError] if the underlying channel is closed. | 71 /// Throws a [StateError] if the underlying channel is closed. |
73 VirtualChannel virtualChannel([id]); | 72 VirtualChannel virtualChannel([id]); |
74 } | 73 } |
75 | 74 |
76 /// The implementation of [MultiChannel]. | 75 /// The implementation of [MultiChannel]. |
77 /// | 76 /// |
78 /// This is private so that [VirtualChannel] can inherit from [MultiChannel] | 77 /// This is private so that [VirtualChannel] can inherit from [MultiChannel] |
79 /// without having to implement all the private members. | 78 /// without having to implement all the private members. |
80 class _MultiChannel extends StreamChannelMixin implements MultiChannel { | 79 class _MultiChannel extends StreamChannelMixin implements MultiChannel { |
81 /// The inner stream over which all communication is received. | 80 /// The inner channel over which all communication is conducted. |
82 /// | 81 /// |
83 /// This will be `null` if the underlying communication channel is closed. | 82 /// This will be `null` if the underlying communication channel is closed. |
84 Stream _innerStream; | 83 StreamChannel _inner; |
85 | 84 |
86 /// The inner sink over which all communication is sent. | 85 /// The subscription to [_inner.stream]. |
87 /// | |
88 /// This will be `null` if the underlying communication channel is closed. | |
89 StreamSink _innerSink; | |
90 | |
91 /// The subscription to [_innerStream]. | |
92 StreamSubscription _innerStreamSubscription; | 86 StreamSubscription _innerStreamSubscription; |
93 | 87 |
94 Stream get stream => _streamController.stream; | 88 Stream get stream => _streamController.stream; |
95 final _streamController = new StreamController(sync: true); | 89 final _streamController = new StreamController(sync: true); |
96 | 90 |
97 StreamSink get sink => _sinkController.sink; | 91 StreamSink get sink => _sinkController.sink; |
98 final _sinkController = new StreamController(sync: true); | 92 final _sinkController = new StreamController(sync: true); |
99 | 93 |
100 /// A map from virtual channel ids to [StreamController]s that should be used | 94 /// A map from virtual channel ids to [StreamController]s that should be used |
101 /// to write messages received from those channels. | 95 /// to write messages received from those channels. |
(...skipping 19 matching lines...) Expand all Loading... |
121 /// the remote endpoint or a reply on a channel that originated in the local | 115 /// the remote endpoint or a reply on a channel that originated in the local |
122 /// endpoint. | 116 /// endpoint. |
123 /// | 117 /// |
124 /// The trick is that each endpoint only uses odd ids for its own channels. | 118 /// The trick is that each endpoint only uses odd ids for its own channels. |
125 /// When sending a message over a channel that was created by the remote | 119 /// When sending a message over a channel that was created by the remote |
126 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] | 120 /// endpoint, the channel's id plus one is used. This way each [MultiChannel] |
127 /// knows that if an incoming message has an odd id, it's using the local id | 121 /// knows that if an incoming message has an odd id, it's using the local id |
128 /// scheme, but if it has an even id, it's using the remote id scheme. | 122 /// scheme, but if it has an even id, it's using the remote id scheme. |
129 var _nextId = 1; | 123 var _nextId = 1; |
130 | 124 |
131 _MultiChannel(this._innerStream, this._innerSink) { | 125 _MultiChannel(this._inner) { |
132 // The default connection is a special case which has id 0 on both ends. | 126 // The default connection is a special case which has id 0 on both ends. |
133 // This allows it to begin connected without having to send over an id. | 127 // This allows it to begin connected without having to send over an id. |
134 _streamControllers[0] = _streamController; | 128 _streamControllers[0] = _streamController; |
135 _sinkControllers[0] = _sinkController; | 129 _sinkControllers[0] = _sinkController; |
136 _sinkController.stream.listen( | 130 _sinkController.stream.listen( |
137 (message) => _innerSink.add([0, message]), | 131 (message) => _inner.sink.add([0, message]), |
138 onDone: () => _closeChannel(0, 0)); | 132 onDone: () => _closeChannel(0, 0)); |
139 | 133 |
140 _innerStreamSubscription = _innerStream.listen((message) { | 134 _innerStreamSubscription = _inner.stream.listen((message) { |
141 var id = message[0]; | 135 var id = message[0]; |
142 var sink = _streamControllers[id]; | 136 var sink = _streamControllers[id]; |
143 | 137 |
144 // A sink might not exist if the channel was closed before an incoming | 138 // A sink might not exist if the channel was closed before an incoming |
145 // message was processed. | 139 // message was processed. |
146 if (sink == null) return; | 140 if (sink == null) return; |
147 if (message.length > 1) { | 141 if (message.length > 1) { |
148 sink.add(message[1]); | 142 sink.add(message[1]); |
149 return; | 143 return; |
150 } | 144 } |
151 | 145 |
152 // A message without data indicates that the channel has been closed. | 146 // A message without data indicates that the channel has been closed. |
153 _sinkControllers[id].close(); | 147 _sinkControllers[id].close(); |
154 }, onDone: _closeInnerChannel, | 148 }, onDone: _closeInnerChannel, |
155 onError: _streamController.addError); | 149 onError: _streamController.addError); |
156 } | 150 } |
157 | 151 |
158 VirtualChannel virtualChannel([id]) { | 152 VirtualChannel virtualChannel([id]) { |
159 if (_innerStream == null) { | 153 if (_inner == null) { |
160 throw new StateError("The underlying channel is closed."); | 154 throw new StateError("The underlying channel is closed."); |
161 } | 155 } |
162 | 156 |
163 var inputId; | 157 var inputId; |
164 var outputId; | 158 var outputId; |
165 if (id != null) { | 159 if (id != null) { |
166 // Since the user is passing in an id, we're connected to a remote | 160 // Since the user is passing in an id, we're connected to a remote |
167 // VirtualChannel. This means messages they send over this channel will | 161 // VirtualChannel. This means messages they send over this channel will |
168 // have the original odd id, but our replies will have an even id. | 162 // have the original odd id, but our replies will have an even id. |
169 inputId = id; | 163 inputId = id; |
170 outputId = (id as int) + 1; | 164 outputId = (id as int) + 1; |
171 } else { | 165 } else { |
172 // Since we're generating an id, we originated this VirtualChannel. This | 166 // Since we're generating an id, we originated this VirtualChannel. This |
173 // means messages we send over this channel will have the original odd id, | 167 // means messages we send over this channel will have the original odd id, |
174 // but the remote channel's replies will have an even id. | 168 // but the remote channel's replies will have an even id. |
175 inputId = _nextId + 1; | 169 inputId = _nextId + 1; |
176 outputId = _nextId; | 170 outputId = _nextId; |
177 _nextId += 2; | 171 _nextId += 2; |
178 } | 172 } |
179 | 173 |
180 if (_streamControllers.containsKey(inputId)) { | 174 if (_streamControllers.containsKey(inputId)) { |
181 throw new ArgumentError("A virtual channel with id $id already exists."); | 175 throw new ArgumentError("A virtual channel with id $id already exists."); |
182 } | 176 } |
183 | 177 |
184 var streamController = new StreamController(sync: true); | 178 var streamController = new StreamController(sync: true); |
185 var sinkController = new StreamController(sync: true); | 179 var sinkController = new StreamController(sync: true); |
186 _streamControllers[inputId] = streamController; | 180 _streamControllers[inputId] = streamController; |
187 _sinkControllers[inputId] = sinkController; | 181 _sinkControllers[inputId] = sinkController; |
188 sinkController.stream.listen( | 182 sinkController.stream.listen( |
189 (message) => _innerSink.add([outputId, message]), | 183 (message) => _inner.sink.add([outputId, message]), |
190 onDone: () => _closeChannel(inputId, outputId)); | 184 onDone: () => _closeChannel(inputId, outputId)); |
191 | 185 |
192 return new VirtualChannel._( | 186 return new VirtualChannel._( |
193 this, outputId, streamController.stream, sinkController.sink); | 187 this, outputId, streamController.stream, sinkController.sink); |
194 } | 188 } |
195 | 189 |
196 /// Closes the virtual channel for which incoming messages have [inputId] and | 190 /// Closes the virtual channel for which incoming messages have [inputId] and |
197 /// outgoing messages have [outputId]. | 191 /// outgoing messages have [outputId]. |
198 void _closeChannel(int inputId, int outputId) { | 192 void _closeChannel(int inputId, int outputId) { |
199 _streamControllers.remove(inputId).close(); | 193 _streamControllers.remove(inputId).close(); |
200 _sinkControllers.remove(inputId).close(); | 194 _sinkControllers.remove(inputId).close(); |
201 | 195 |
202 if (_innerSink == null) return; | 196 if (_inner == null) return; |
203 | 197 |
204 // A message without data indicates that the virtual channel has been | 198 // A message without data indicates that the virtual channel has been |
205 // closed. | 199 // closed. |
206 _innerSink.add([outputId]); | 200 _inner.sink.add([outputId]); |
207 if (_streamControllers.isEmpty) _closeInnerChannel(); | 201 if (_streamControllers.isEmpty) _closeInnerChannel(); |
208 } | 202 } |
209 | 203 |
210 /// Closes the underlying communication channel. | 204 /// Closes the underlying communication channel. |
211 void _closeInnerChannel() { | 205 void _closeInnerChannel() { |
212 _innerSink.close(); | 206 _inner.sink.close(); |
213 _innerStreamSubscription.cancel(); | 207 _innerStreamSubscription.cancel(); |
214 _innerStream = null; | 208 _inner = null; |
215 _innerSink = null; | |
216 for (var controller in _sinkControllers.values.toList()) { | 209 for (var controller in _sinkControllers.values.toList()) { |
217 controller.close(); | 210 controller.close(); |
218 } | 211 } |
219 } | 212 } |
220 } | 213 } |
221 | 214 |
222 /// A virtual channel created by [MultiChannel]. | 215 /// A virtual channel created by [MultiChannel]. |
223 /// | 216 /// |
224 /// This implements [MultiChannel] for convenience. | 217 /// This implements [MultiChannel] for convenience. |
225 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's | 218 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's |
226 /// [MultiChannel.virtualChannel]. | 219 /// [MultiChannel.virtualChannel]. |
227 class VirtualChannel extends StreamChannelMixin implements MultiChannel { | 220 class VirtualChannel extends StreamChannelMixin implements MultiChannel { |
228 /// The [MultiChannel] that created this. | 221 /// The [MultiChannel] that created this. |
229 final MultiChannel _parent; | 222 final MultiChannel _parent; |
230 | 223 |
231 /// The identifier for this channel. | 224 /// The identifier for this channel. |
232 /// | 225 /// |
233 /// This can be sent across the [MultiChannel] to provide the remote endpoint | 226 /// This can be sent across the [MultiChannel] to provide the remote endpoint |
234 /// a means to connect to this channel. Nothing about this is guaranteed | 227 /// a means to connect to this channel. Nothing about this is guaranteed |
235 /// except that it will be JSON-serializable. | 228 /// except that it will be JSON-serializable. |
236 final id; | 229 final id; |
237 | 230 |
238 final Stream stream; | 231 final Stream stream; |
239 final StreamSink sink; | 232 final StreamSink sink; |
240 | 233 |
241 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 234 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
242 | 235 |
243 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 236 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
244 } | 237 } |
OLD | NEW |