OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library test.multi_channel; | 5 library test.multi_channel; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 | 8 |
9 import 'stream_channel.dart'; | 9 import 'stream_channel.dart'; |
10 | 10 |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
86 Stream _innerStream; | 86 Stream _innerStream; |
87 | 87 |
88 /// The inner sink over which all communication is sent. | 88 /// The inner sink over which all communication is sent. |
89 /// | 89 /// |
90 /// This will be `null` if the underlying communication channel is closed. | 90 /// This will be `null` if the underlying communication channel is closed. |
91 StreamSink _innerSink; | 91 StreamSink _innerSink; |
92 | 92 |
93 /// The subscription to [_innerStream]. | 93 /// The subscription to [_innerStream]. |
94 StreamSubscription _innerStreamSubscription; | 94 StreamSubscription _innerStreamSubscription; |
95 | 95 |
| 96 /// Whether this channel has been closed. |
| 97 bool _closed = false; |
| 98 |
96 Stream get stream => _streamController.stream; | 99 Stream get stream => _streamController.stream; |
97 final _streamController = new StreamController(sync: true); | 100 final _streamController = new StreamController(sync: true); |
98 | 101 |
99 StreamSink get sink => _sinkController.sink; | 102 StreamSink get sink => _sinkController.sink; |
100 final _sinkController = new StreamController(sync: true); | 103 final _sinkController = new StreamController(sync: true); |
101 | 104 |
102 /// A map from virtual channel ids to [StreamController]s that should be used | 105 /// A map from virtual channel ids to [StreamController]s that should be used |
103 /// to write messages received from those channels. | 106 /// to write messages received from those channels. |
104 final _streamControllers = new Map<int, StreamController>(); | 107 final _streamControllers = new Map<int, StreamController>(); |
105 | 108 |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
191 (message) => _innerSink.add([outputId, message]), | 194 (message) => _innerSink.add([outputId, message]), |
192 onDone: () => _closeChannel(inputId, outputId)); | 195 onDone: () => _closeChannel(inputId, outputId)); |
193 | 196 |
194 return new VirtualChannel._( | 197 return new VirtualChannel._( |
195 this, outputId, streamController.stream, sinkController.sink); | 198 this, outputId, streamController.stream, sinkController.sink); |
196 } | 199 } |
197 | 200 |
198 /// Closes the virtual channel for which incoming messages have [inputId] and | 201 /// Closes the virtual channel for which incoming messages have [inputId] and |
199 /// outgoing messages have [outputId]. | 202 /// outgoing messages have [outputId]. |
200 void _closeChannel(int inputId, int outputId) { | 203 void _closeChannel(int inputId, int outputId) { |
| 204 if (_closed) return; |
| 205 _closed = true; |
| 206 |
201 // A message without data indicates that the virtual channel has been | 207 // A message without data indicates that the virtual channel has been |
202 // closed. | 208 // closed. |
203 _streamControllers.remove(inputId).close(); | 209 _streamControllers.remove(inputId).close(); |
204 _sinkControllers.remove(inputId).close(); | 210 _sinkControllers.remove(inputId).close(); |
205 | 211 |
206 if (_innerSink == null) return; | 212 if (_innerSink == null) return; |
207 _innerSink.add([outputId]); | 213 _innerSink.add([outputId]); |
208 if (_streamControllers.isEmpty) _closeInnerChannel(); | 214 if (_streamControllers.isEmpty) _closeInnerChannel(); |
209 } | 215 } |
210 | 216 |
(...skipping 25 matching lines...) Expand all Loading... |
236 /// except that it will be JSON-serializable. | 242 /// except that it will be JSON-serializable. |
237 final id; | 243 final id; |
238 | 244 |
239 final Stream stream; | 245 final Stream stream; |
240 final StreamSink sink; | 246 final StreamSink sink; |
241 | 247 |
242 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 248 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
243 | 249 |
244 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 250 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
245 } | 251 } |
OLD | NEW |