Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 import 'dart:async'; | |
| 6 | |
| 7 import '../stream_channel.dart'; | |
| 8 | |
| 9 /// Allows the caller to force a channel to disconnect. | |
| 10 /// | |
| 11 /// When [disconnect] is called, the channel (or channels) transformed by this | |
| 12 /// transformer will act as though the remote end had disconnected—the stream | |
| 13 /// will emit a done event, and the sink will ignore future inputs. The inner | |
| 14 /// sink will also be closed to notify the remote end of the disconnection. | |
| 15 /// | |
| 16 /// If a channel is transformed after the [disconnect] has been called, it will | |
| 17 /// be disconnected immediately. | |
| 18 class Disconnector<T> implements StreamChannelTransformer<T, T> { | |
| 19 /// Whether [disconnect] has been called. | |
| 20 bool get isDisconnected => _isDisconnected; | |
| 21 var _isDisconnected = false; | |
| 22 | |
| 23 /// The sinks for transformed channels. | |
| 24 /// | |
| 25 /// Note that we assume that transformed channels provide the stream channel | |
| 26 /// guarantees. This allows us to only track sinks, because we know closing | |
| 27 /// the underlying sink will cause the stream to emit a done event. | |
| 28 final _sinks = <_DisconnectorSink<T>>[]; | |
| 29 | |
| 30 /// Disconnects all channels that have been transformed. | |
| 31 void disconnect() { | |
| 32 _isDisconnected = true; | |
| 33 for (var sink in _sinks) { | |
| 34 sink._disconnect(); | |
| 35 } | |
| 36 _sinks.clear(); | |
| 37 } | |
| 38 | |
| 39 StreamChannel<T> bind(StreamChannel<T> channel) { | |
| 40 return channel.changeSink((innerSink) { | |
| 41 var sink = new _DisconnectorSink(innerSink); | |
| 42 | |
| 43 if (_isDisconnected) { | |
| 44 sink._disconnect(); | |
| 45 } else { | |
| 46 _sinks.add(sink); | |
| 47 } | |
| 48 | |
| 49 return sink; | |
| 50 }); | |
| 51 } | |
| 52 } | |
| 53 | |
| 54 /// A sink wrapper that can force a disconnection. | |
| 55 class _DisconnectorSink<T> implements StreamSink<T> { | |
| 56 /// The inner sink. | |
| 57 final StreamSink<T> _inner; | |
| 58 | |
| 59 Future get done => _inner.done; | |
| 60 | |
| 61 /// Whether [Disconnector.disconnect] has been called. | |
| 62 var _isDisconnected = false; | |
|
tjblasi
2016/02/09 16:45:37
Out of curiosity, what is your rule for typing pro
nweiz
2016/02/09 19:10:53
In general, I don't type a field if its type would
| |
| 63 | |
| 64 /// Whether the user has called [close]. | |
| 65 var _closed = false; | |
| 66 | |
| 67 /// The subscription to the stream passed to [addStream], if a stream is | |
| 68 /// currently being added. | |
| 69 StreamSubscription<T> _addStreamSubscription; | |
| 70 | |
| 71 /// The completer for the future returned by [addStream], if a stream is | |
| 72 /// currently being added. | |
| 73 Completer _addStreamCompleter; | |
| 74 | |
| 75 /// Whether we're currently adding a stream with [addStream]. | |
| 76 bool get _inAddStream => _addStreamSubscription != null; | |
| 77 | |
| 78 _DisconnectorSink(this._inner); | |
| 79 | |
| 80 void add(T data) { | |
| 81 if (_closed) throw new StateError("Cannot add event after closing."); | |
| 82 if (_inAddStream) { | |
| 83 throw new StateError("Cannot add event while adding stream."); | |
| 84 } | |
| 85 if (_isDisconnected) return; | |
| 86 | |
| 87 _inner.add(data); | |
| 88 } | |
| 89 | |
| 90 void addError(error, [StackTrace stackTrace]) { | |
| 91 if (_closed) throw new StateError("Cannot add event after closing."); | |
| 92 if (_inAddStream) { | |
| 93 throw new StateError("Cannot add event while adding stream."); | |
| 94 } | |
| 95 if (_isDisconnected) return; | |
| 96 | |
| 97 _inner.addError(error, stackTrace); | |
| 98 } | |
| 99 | |
| 100 Future addStream(Stream<T> stream) { | |
| 101 if (_closed) throw new StateError("Cannot add stream after closing."); | |
| 102 if (_inAddStream) { | |
| 103 throw new StateError("Cannot add stream while adding stream."); | |
| 104 } | |
| 105 if (_isDisconnected) return new Future.value(); | |
| 106 | |
| 107 _addStreamCompleter = new Completer.sync(); | |
| 108 _addStreamSubscription = stream.listen( | |
| 109 _inner.add, | |
| 110 onError: _inner.addError, | |
| 111 onDone: _addStreamCompleter.complete); | |
| 112 return _addStreamCompleter.future.then((_) { | |
| 113 _addStreamCompleter = null; | |
| 114 _addStreamSubscription = null; | |
| 115 }); | |
| 116 } | |
| 117 | |
| 118 Future close() { | |
| 119 if (_inAddStream) { | |
| 120 throw new StateError("Cannot close sink while adding stream."); | |
| 121 } | |
| 122 | |
| 123 _closed = true; | |
| 124 return _inner.close(); | |
| 125 } | |
| 126 | |
| 127 /// Disconnects this sink. | |
| 128 /// | |
| 129 /// This closes the underlying sink and stops forwarding events. | |
| 130 void _disconnect() { | |
| 131 _isDisconnected = true; | |
| 132 _inner.close(); | |
| 133 | |
| 134 if (!_inAddStream) return; | |
| 135 _addStreamCompleter.complete(_addStreamSubscription.cancel()); | |
| 136 _addStreamCompleter = null; | |
| 137 _addStreamSubscription = null; | |
| 138 } | |
| 139 } | |
| OLD | NEW |