OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.streams.stream_completer; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 /// A [stream] where the contents aren't known at creation time. |
| 10 /// |
| 11 /// It is generally recommended that you never create a `Future<Stream>` |
| 12 /// because you can just use directly create a stream that doesn't do anything |
| 13 /// until it's ready to do so. |
| 14 /// This class can be used to create such a stream. |
| 15 /// |
| 16 /// The [stream] is a normal stream that you can listen to immediately, |
| 17 /// but until [setSourceStream] is called, the stream won't produce |
| 18 /// any events. |
| 19 /// |
| 20 /// The same effect can be achieved by using a [StreamController] |
| 21 /// and adding the stream using `addStream` when both |
| 22 /// the controller's stream is listened to and the source stream is ready. |
| 23 /// This class attempts to shortcut some of the overhead when possible. |
| 24 /// For example, if the [stream] is only listened to |
| 25 /// after the source stream has been set, |
| 26 /// the listen is performed directly on the source stream. |
| 27 class StreamCompleter<T> { |
| 28 final Stream<T> stream = new _PromiseStream<T>(); |
| 29 |
| 30 /// Set a stream as the source of events for the [StreamCompleter]. |
| 31 /// |
| 32 /// There is no guarantee that the stream will ever be listened to. |
| 33 void setSourceStream(Stream<T> stream) { |
| 34 _PromiseStream promiseStream = this.stream; |
| 35 promiseStream._linkStream(stream); |
| 36 } |
| 37 |
| 38 /// As setting an empty stream using [setSourceStream]. |
| 39 void setEmpty() { |
| 40 // TODO(lrn): Optimize this to not actually create the empty stream. |
| 41 _PromiseStream promiseStream = this.stream; |
| 42 promiseStream._linkStream(new Stream.fromIterable(const [])); |
| 43 } |
| 44 } |
| 45 |
| 46 /// Stream that acts as a source stream it is (eventually) linked with. |
| 47 /// |
| 48 /// The linked source stream can be set after a user has started listening on |
| 49 /// this stream. No events occur before the source stream is provided. |
| 50 /// |
| 51 /// If a user listens before events are available, the state of the |
| 52 /// subscription is maintained, and the subscription is then linked |
| 53 /// to the source stream when that becomes available. |
| 54 class _PromiseStream<T> extends Stream<T> { |
| 55 static const int _UNINITIALIZED = 0; |
| 56 static const int _STREAM_SET = 1; |
| 57 static const int _LISTENED = 2; |
| 58 |
| 59 int _state = _UNINITIALIZED; |
| 60 var _streamOrSubscription; |
| 61 |
| 62 void _linkStream(Stream stream) { |
| 63 if (_state == _UNINITIALIZED) { |
| 64 _streamOrSubscription = stream; |
| 65 _state |= _STREAM_SET; |
| 66 } else if (_state == _LISTENED) { |
| 67 _PromiseSubscription promiseSubscription = _streamOrSubscription; |
| 68 promiseSubscription._linkStream(stream); |
| 69 _state |= _STREAM_SET; |
| 70 } else { |
| 71 throw new StateError("Stream already linked."); |
| 72 } |
| 73 } |
| 74 |
| 75 StreamSubscription listen(void onData(T event), |
| 76 {Function onError, |
| 77 void onDone(), |
| 78 bool cancelOnError}) { |
| 79 int state = _state; |
| 80 _state = state | _LISTENED; |
| 81 if (state == _UNINITIALIZED) { |
| 82 _PromiseSubscription subscription = |
| 83 new _PromiseSubscription<T>(true == cancelOnError); |
| 84 subscription.onData(onData); |
| 85 subscription.onError(onError); |
| 86 subscription.onDone(onDone); |
| 87 _streamOrSubscription = subscription; |
| 88 return subscription; |
| 89 } |
| 90 if (state == _STREAM_SET) { |
| 91 Stream stream = _streamOrSubscription; |
| 92 return stream.listen(onData, onError: onError, onDone: onDone, |
| 93 cancelOnError: cancelOnError); |
| 94 } |
| 95 print(state); |
| 96 throw new StateError("Stream has already been listened to."); |
| 97 } |
| 98 } |
| 99 |
| 100 /// Subscription for a [_PromiseStream] that is listened to but has no data. |
| 101 /// |
| 102 /// Maintains the state of a stream subscription that hasn't received any |
| 103 /// events until events are available, then it starts forwarding to another |
| 104 /// subscription. |
| 105 class _PromiseSubscription<T> implements StreamSubscription<T> { |
| 106 // State values |
| 107 // The subscription is in one of three distinct states: |
| 108 // - Initial (remembers whether it's cancelOnError and paused). |
| 109 // - Cancelled (before being linked). |
| 110 // - Linked (before being cancelled). |
| 111 static const int _INIT = 0; |
| 112 static const int _INIT_CANCEL_ON_ERROR = 1; |
| 113 static const int _CANCELLED = 2; // Exclusive. |
| 114 static const int _LINKED = 3; // Exclusive |
| 115 static const int _PAUSE = 4; // Used with _INIT or _INIT_CANCEL_ON_ERROR. |
| 116 |
| 117 /// State represents the status of the subscription until the |
| 118 /// real subscription becomes available. |
| 119 /// |
| 120 /// While `_state` is not `_LINKED` or `_CANCELED`, `_stateData` contains |
| 121 /// a list of length three with the data, error and done handlers that |
| 122 /// have been set. |
| 123 /// |
| 124 /// While `_state` is [_LINKED], [_stateData] contains the real |
| 125 /// stream subscription. |
| 126 /// |
| 127 /// When `_state` is `_CANCELED`, `_stateData` is cleared since the |
| 128 /// event handlers won't be needed anyway. |
| 129 int _state; |
| 130 var _stateData = new List(3); |
| 131 |
| 132 _PromiseSubscription(bool cancelOnError) |
| 133 : _state = (cancelOnError ? _INIT_CANCEL_ON_ERROR : _INIT); |
| 134 |
| 135 bool get _isLinked => _state == _LINKED; |
| 136 bool get _isCancelled => _state == _CANCELLED; |
| 137 bool get _isInitial => (_state & (_PAUSE - 1)) <= _INIT_CANCEL_ON_ERROR; |
| 138 |
| 139 void _linkStream(Stream stream) { |
| 140 if (_isLinked) { |
| 141 throw new StateError("Already linked to a stream."); |
| 142 } |
| 143 if (_isCancelled) { |
| 144 return; |
| 145 } |
| 146 bool cancelOnError = (_state & _INIT_CANCEL_ON_ERROR) != 0; |
| 147 StreamSubscription subscription = |
| 148 stream.listen(null, cancelOnError: cancelOnError); |
| 149 List handlers = _stateData; |
| 150 subscription.onData(handlers[0]); |
| 151 subscription.onError(handlers[1]); |
| 152 subscription.onDone(handlers[2]); |
| 153 int state = _state; |
| 154 _subscription = subscription; |
| 155 while (state >= _PAUSE) { |
| 156 subscription.pause(); |
| 157 state -= _PAUSE; |
| 158 } |
| 159 } |
| 160 |
| 161 List get _handlers { |
| 162 assert(_isInitial); |
| 163 return _stateData; |
| 164 } |
| 165 |
| 166 StreamSubscription get _subscription { |
| 167 assert(_isLinked); |
| 168 return _stateData; |
| 169 } |
| 170 |
| 171 // Sets state to linked. |
| 172 void set _subscription(StreamSubscription subscription) { |
| 173 assert(_isInitial); |
| 174 _stateData = subscription; |
| 175 _state = _LINKED; |
| 176 } |
| 177 |
| 178 void onData(void handleData(T data)) { |
| 179 if (_isLinked) { |
| 180 _subscription.onData(handleData); |
| 181 } else { |
| 182 assert(_isInitial); |
| 183 _handlers[0] = handleData; |
| 184 } |
| 185 } |
| 186 |
| 187 void onError(void handleError(error, StackTrace stackTrace)) { |
| 188 if (_isLinked) { |
| 189 _subscription.onError(handleError); |
| 190 } else { |
| 191 assert(_isInitial); |
| 192 _handlers[1] = handleError; |
| 193 } |
| 194 } |
| 195 |
| 196 void onDone(void handleDone()) { |
| 197 if (_isLinked) { |
| 198 _subscription.onDone(handleDone); |
| 199 } else { |
| 200 assert(_isInitial); |
| 201 _handlers[2] = handleDone; |
| 202 } |
| 203 } |
| 204 |
| 205 void pause([Future resumeFuture]) { |
| 206 if (_isLinked) { |
| 207 _subscription.pause(resumeFuture); |
| 208 } else if (!_isCancelled) { |
| 209 _state += _PAUSE; |
| 210 if (resumeFuture != null) { |
| 211 resumeFuture.whenComplete(this.resume); |
| 212 } |
| 213 } |
| 214 } |
| 215 |
| 216 void resume() { |
| 217 if (_isLinked) { |
| 218 _subscription.resume(); |
| 219 } else if (_state >= _PAUSE) { |
| 220 _state -= _PAUSE; |
| 221 } |
| 222 } |
| 223 |
| 224 Future cancel() { |
| 225 if (_isLinked) { |
| 226 return _subscription.cancel(); |
| 227 } else { |
| 228 _stateData = null; |
| 229 _state = _CANCELLED; |
| 230 return new Future.value(); |
| 231 } |
| 232 } |
| 233 |
| 234 Future asFuture([futureValue]) { |
| 235 if (_isLinked) { |
| 236 return _subscription.asFuture(futureValue); |
| 237 } |
| 238 Completer completer = new Completer(); |
| 239 if (!_isCancelled) { |
| 240 // Asking for a future of a cancelled subscription gives a future |
| 241 // which never completes. |
| 242 _handlers[1] = _cancelBeforeError(completer.completeError); |
| 243 if (futureValue == null) { |
| 244 _handlers[2] = completer.complete; |
| 245 } else { |
| 246 _handlers[2] = () { completer.complete(futureValue); }; |
| 247 } |
| 248 } |
| 249 return completer.future; |
| 250 } |
| 251 |
| 252 bool get isPaused { |
| 253 if (_isLinked) { |
| 254 return _subscription.isPaused; |
| 255 } |
| 256 return _state >= _PAUSE; |
| 257 } |
| 258 |
| 259 /// Helper function used by [asFuture]. |
| 260 /// |
| 261 /// Returns an error handler which cancels the stream when it receives an |
| 262 /// error. |
| 263 Function _cancelBeforeError(Function handleError) { |
| 264 return (e, s) { |
| 265 cancel(); |
| 266 if (handleError is _BinaryCallback) { |
| 267 handleError(e, s); |
| 268 } else { |
| 269 handleError(e); |
| 270 } |
| 271 }; |
| 272 } |
| 273 } |
| 274 |
| 275 typedef _BinaryCallback(a, b); |
OLD | NEW |