Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library test.multi_channel; | 5 library test.multi_channel; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 | 8 |
| 9 import 'stream_channel.dart'; | 9 import 'stream_channel.dart'; |
| 10 | 10 |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 86 Stream _innerStream; | 86 Stream _innerStream; |
| 87 | 87 |
| 88 /// The inner sink over which all communication is sent. | 88 /// The inner sink over which all communication is sent. |
| 89 /// | 89 /// |
| 90 /// This will be `null` if the underlying communication channel is closed. | 90 /// This will be `null` if the underlying communication channel is closed. |
| 91 StreamSink _innerSink; | 91 StreamSink _innerSink; |
| 92 | 92 |
| 93 /// The subscription to [_innerStream]. | 93 /// The subscription to [_innerStream]. |
| 94 StreamSubscription _innerStreamSubscription; | 94 StreamSubscription _innerStreamSubscription; |
| 95 | 95 |
| 96 /// Whether this channel has been closed. | |
| 97 var _closed = false; | |
|
kevmoo
2015/04/13 21:37:27
type this as bool
nweiz
2015/04/13 22:30:13
Done.
| |
| 98 | |
| 96 Stream get stream => _streamController.stream; | 99 Stream get stream => _streamController.stream; |
| 97 final _streamController = new StreamController(sync: true); | 100 final _streamController = new StreamController(sync: true); |
| 98 | 101 |
| 99 StreamSink get sink => _sinkController.sink; | 102 StreamSink get sink => _sinkController.sink; |
| 100 final _sinkController = new StreamController(sync: true); | 103 final _sinkController = new StreamController(sync: true); |
| 101 | 104 |
| 102 /// A map from virtual channel ids to [StreamController]s that should be used | 105 /// A map from virtual channel ids to [StreamController]s that should be used |
| 103 /// to write messages received from those channels. | 106 /// to write messages received from those channels. |
| 104 final _streamControllers = new Map<int, StreamController>(); | 107 final _streamControllers = new Map<int, StreamController>(); |
| 105 | 108 |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 191 (message) => _innerSink.add([outputId, message]), | 194 (message) => _innerSink.add([outputId, message]), |
| 192 onDone: () => _closeChannel(inputId, outputId)); | 195 onDone: () => _closeChannel(inputId, outputId)); |
| 193 | 196 |
| 194 return new VirtualChannel._( | 197 return new VirtualChannel._( |
| 195 this, outputId, streamController.stream, sinkController.sink); | 198 this, outputId, streamController.stream, sinkController.sink); |
| 196 } | 199 } |
| 197 | 200 |
| 198 /// Closes the virtual channel for which incoming messages have [inputId] and | 201 /// Closes the virtual channel for which incoming messages have [inputId] and |
| 199 /// outgoing messages have [outputId]. | 202 /// outgoing messages have [outputId]. |
| 200 void _closeChannel(int inputId, int outputId) { | 203 void _closeChannel(int inputId, int outputId) { |
| 204 if (_closed) return; | |
| 205 _closed = true; | |
| 206 | |
| 201 // A message without data indicates that the virtual channel has been | 207 // A message without data indicates that the virtual channel has been |
| 202 // closed. | 208 // closed. |
| 203 _streamControllers.remove(inputId).close(); | 209 _streamControllers.remove(inputId).close(); |
| 204 _sinkControllers.remove(inputId).close(); | 210 _sinkControllers.remove(inputId).close(); |
| 205 | 211 |
| 206 if (_innerSink == null) return; | 212 if (_innerSink == null) return; |
| 207 _innerSink.add([outputId]); | 213 _innerSink.add([outputId]); |
| 208 if (_streamControllers.isEmpty) _closeInnerChannel(); | 214 if (_streamControllers.isEmpty) _closeInnerChannel(); |
| 209 } | 215 } |
| 210 | 216 |
| (...skipping 25 matching lines...) Expand all Loading... | |
| 236 /// except that it will be JSON-serializable. | 242 /// except that it will be JSON-serializable. |
| 237 final id; | 243 final id; |
| 238 | 244 |
| 239 final Stream stream; | 245 final Stream stream; |
| 240 final StreamSink sink; | 246 final StreamSink sink; |
| 241 | 247 |
| 242 VirtualChannel._(this._parent, this.id, this.stream, this.sink); | 248 VirtualChannel._(this._parent, this.id, this.stream, this.sink); |
| 243 | 249 |
| 244 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); | 250 VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id); |
| 245 } | 251 } |
| OLD | NEW |