OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library test.multi_channel; | 5 library test.multi_channel; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 | 8 |
9 import 'stream_channel.dart'; | 9 import 'stream_channel.dart'; |
10 | 10 |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
86 Stream _innerStream; | 86 Stream _innerStream; |
87 | 87 |
88 /// The inner sink over which all communication is sent. | 88 /// The inner sink over which all communication is sent. |
89 /// | 89 /// |
90 /// This will be `null` if the underlying communication channel is closed. | 90 /// This will be `null` if the underlying communication channel is closed. |
91 StreamSink _innerSink; | 91 StreamSink _innerSink; |
92 | 92 |
93 /// The subscription to [_innerStream]. | 93 /// The subscription to [_innerStream]. |
94 StreamSubscription _innerStreamSubscription; | 94 StreamSubscription _innerStreamSubscription; |
95 | 95 |
96 /// Whether this channel has been closed. | |
97 bool _closed = false; | |
98 | |
99 Stream get stream => _streamController.stream; | 96 Stream get stream => _streamController.stream; |
100 final _streamController = new StreamController(sync: true); | 97 final _streamController = new StreamController(sync: true); |
101 | 98 |
102 StreamSink get sink => _sinkController.sink; | 99 StreamSink get sink => _sinkController.sink; |
103 final _sinkController = new StreamController(sync: true); | 100 final _sinkController = new StreamController(sync: true); |
104 | 101 |
105 /// A map from virtual channel ids to [StreamController]s that should be used | 102 /// A map from virtual channel ids to [StreamController]s that should be used |
106 /// to write messages received from those channels. | 103 /// to write messages received from those channels. |
107 final _streamControllers = new Map<int, StreamController>(); | 104 final _streamControllers = new Map<int, StreamController>(); |
108 | 105 |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
194 (message) => _innerSink.add([outputId, message]), | 191 (message) => _innerSink.add([outputId, message]), |
195 onDone: () => _closeChannel(inputId, outputId)); | 192 onDone: () => _closeChannel(inputId, outputId)); |
196 | 193 |
197 return new VirtualChannel._( | 194 return new VirtualChannel._( |
198 this, outputId, streamController.stream, sinkController.sink); | 195 this, outputId, streamController.stream, sinkController.sink); |
199 } | 196 } |
200 | 197 |
201 /// Closes the virtual channel for which incoming messages have [inputId] and | 198 /// Closes the virtual channel for which incoming messages have [inputId] and |
202 /// outgoing messages have [outputId]. | 199 /// outgoing messages have [outputId]. |
203 void _closeChannel(int inputId, int outputId) { | 200 void _closeChannel(int inputId, int outputId) { |
204 if (_closed) return; | |
205 _closed = inputId == 0; | |
206 | |
207 // A message without data indicates that the virtual channel has been | 201 // A message without data indicates that the virtual channel has been |
208 // closed. | 202 // closed. |
209 _streamControllers.remove(inputId).close(); | 203 _streamControllers.remove(inputId).close(); |
210 _sinkControllers.remove(inputId).close(); | 204 _sinkControllers.remove(inputId).close(); |
211 | 205 |
212 if (_innerSink == null) return; | 206 if (_innerSink == null) return; |
213 _innerSink.add([outputId]); | 207 _innerSink.add([outputId]); |
214 if (_streamControllers.isEmpty) _closeInnerChannel(); | 208 if (_streamControllers.isEmpty) _closeInnerChannel(); |
215 } | 209 } |
216 | 210 |
(...skipping 25 matching lines...) Expand all Loading... |
242 /// except that it will be JSON-serializable. | 236 /// except that it will be JSON-serializable. |
243 final id; | 237 final id; |
244 | 238 |
245 final Stream stream; | 239 final Stream stream; |
246 final StreamSink sink; | 240 final StreamSink sink; |
247 | 241 |
248 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 242 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
249 | 243 |
250 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 244 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
251 } | 245 } |
OLD | NEW |