| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library test.multi_channel; | 5 library test.multi_channel; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 | 8 |
| 9 import 'stream_channel.dart'; | 9 import 'stream_channel.dart'; |
| 10 | 10 |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 86 Stream _innerStream; | 86 Stream _innerStream; |
| 87 | 87 |
| 88 /// The inner sink over which all communication is sent. | 88 /// The inner sink over which all communication is sent. |
| 89 /// | 89 /// |
| 90 /// This will be `null` if the underlying communication channel is closed. | 90 /// This will be `null` if the underlying communication channel is closed. |
| 91 StreamSink _innerSink; | 91 StreamSink _innerSink; |
| 92 | 92 |
| 93 /// The subscription to [_innerStream]. | 93 /// The subscription to [_innerStream]. |
| 94 StreamSubscription _innerStreamSubscription; | 94 StreamSubscription _innerStreamSubscription; |
| 95 | 95 |
| 96 /// Whether this channel has been closed. | |
| 97 bool _closed = false; | |
| 98 | |
| 99 Stream get stream => _streamController.stream; | 96 Stream get stream => _streamController.stream; |
| 100 final _streamController = new StreamController(sync: true); | 97 final _streamController = new StreamController(sync: true); |
| 101 | 98 |
| 102 StreamSink get sink => _sinkController.sink; | 99 StreamSink get sink => _sinkController.sink; |
| 103 final _sinkController = new StreamController(sync: true); | 100 final _sinkController = new StreamController(sync: true); |
| 104 | 101 |
| 105 /// A map from virtual channel ids to [StreamController]s that should be used | 102 /// A map from virtual channel ids to [StreamController]s that should be used |
| 106 /// to write messages received from those channels. | 103 /// to write messages received from those channels. |
| 107 final _streamControllers = new Map<int, StreamController>(); | 104 final _streamControllers = new Map<int, StreamController>(); |
| 108 | 105 |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 194 (message) => _innerSink.add([outputId, message]), | 191 (message) => _innerSink.add([outputId, message]), |
| 195 onDone: () => _closeChannel(inputId, outputId)); | 192 onDone: () => _closeChannel(inputId, outputId)); |
| 196 | 193 |
| 197 return new VirtualChannel._( | 194 return new VirtualChannel._( |
| 198 this, outputId, streamController.stream, sinkController.sink); | 195 this, outputId, streamController.stream, sinkController.sink); |
| 199 } | 196 } |
| 200 | 197 |
| 201 /// Closes the virtual channel for which incoming messages have [inputId] and | 198 /// Closes the virtual channel for which incoming messages have [inputId] and |
| 202 /// outgoing messages have [outputId]. | 199 /// outgoing messages have [outputId]. |
| 203 void _closeChannel(int inputId, int outputId) { | 200 void _closeChannel(int inputId, int outputId) { |
| 204 if (_closed) return; | |
| 205 _closed = inputId == 0; | |
| 206 | |
| 207 // A message without data indicates that the virtual channel has been | 201 // A message without data indicates that the virtual channel has been |
| 208 // closed. | 202 // closed. |
| 209 _streamControllers.remove(inputId).close(); | 203 _streamControllers.remove(inputId).close(); |
| 210 _sinkControllers.remove(inputId).close(); | 204 _sinkControllers.remove(inputId).close(); |
| 211 | 205 |
| 212 if (_innerSink == null) return; | 206 if (_innerSink == null) return; |
| 213 _innerSink.add([outputId]); | 207 _innerSink.add([outputId]); |
| 214 if (_streamControllers.isEmpty) _closeInnerChannel(); | 208 if (_streamControllers.isEmpty) _closeInnerChannel(); |
| 215 } | 209 } |
| 216 | 210 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 242 /// except that it will be JSON-serializable. | 236 /// except that it will be JSON-serializable. |
| 243 final id; | 237 final id; |
| 244 | 238 |
| 245 final Stream stream; | 239 final Stream stream; |
| 246 final StreamSink sink; | 240 final StreamSink sink; |
| 247 | 241 |
| 248 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 242 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
| 249 | 243 |
| 250 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 244 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
| 251 } | 245 } |
| OLD | NEW |