| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.async; | |
| 6 | |
| 7 /** Abstract and private interface for a place to put events. */ | |
| 8 abstract class _EventSink<T> { | |
| 9 void _add(T data); | |
| 10 void _addError(Object error, StackTrace stackTrace); | |
| 11 void _close(); | |
| 12 } | |
| 13 | |
| 14 /** | |
| 15 * Abstract and private interface for a place to send events. | |
| 16 * | |
| 17 * Used by event buffering to finally dispatch the pending event, where | |
| 18 * [_EventSink] is where the event first enters the stream subscription, | |
| 19 * and may yet be buffered. | |
| 20 */ | |
| 21 abstract class _EventDispatch<T> { | |
| 22 void _sendData(T data); | |
| 23 void _sendError(Object error, StackTrace stackTrace); | |
| 24 void _sendDone(); | |
| 25 } | |
| 26 | |
| 27 /** | |
| 28 * Default implementation of stream subscription of buffering events. | |
| 29 * | |
| 30 * The only public methods are those of [StreamSubscription], so instances of | |
| 31 * [_BufferingStreamSubscription] can be returned directly as a | |
| 32 * [StreamSubscription] without exposing internal functionality. | |
| 33 * | |
| 34 * The [StreamController] is a public facing version of [Stream] and this class, | |
| 35 * with some methods made public. | |
| 36 * | |
| 37 * The user interface of [_BufferingStreamSubscription] are the following | |
| 38 * methods: | |
| 39 * | |
| 40 * * [_add]: Add a data event to the stream. | |
| 41 * * [_addError]: Add an error event to the stream. | |
| 42 * * [_close]: Request to close the stream. | |
| 43 * * [_onCancel]: Called when the subscription will provide no more events, | |
| 44 * either due to being actively canceled, or after sending a done event. | |
| 45 * * [_onPause]: Called when the subscription wants the event source to pause. | |
| 46 * * [_onResume]: Called when allowing new events after a pause. | |
| 47 * | |
| 48 * The user should not add new events when the subscription requests a paused, | |
| 49 * but if it happens anyway, the subscription will enqueue the events just as | |
| 50 * when new events arrive while still firing an old event. | |
| 51 */ | |
| 52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | |
| 53 _EventSink<T>, | |
| 54 _EventDispatch<T> { | |
| 55 /** The `cancelOnError` flag from the `listen` call. */ | |
| 56 static const int _STATE_CANCEL_ON_ERROR = 1; | |
| 57 /** | |
| 58 * Whether the "done" event has been received. | |
| 59 * No further events are accepted after this. | |
| 60 */ | |
| 61 static const int _STATE_CLOSED = 2; | |
| 62 /** | |
| 63 * Set if the input has been asked not to send events. | |
| 64 * | |
| 65 * This is not the same as being paused, since the input will remain paused | |
| 66 * after a call to [resume] if there are pending events. | |
| 67 */ | |
| 68 static const int _STATE_INPUT_PAUSED = 4; | |
| 69 /** | |
| 70 * Whether the subscription has been canceled. | |
| 71 * | |
| 72 * Set by calling [cancel], or by handling a "done" event, or an "error" event | |
| 73 * when `cancelOnError` is true. | |
| 74 */ | |
| 75 static const int _STATE_CANCELED = 8; | |
| 76 /** | |
| 77 * Set when either: | |
| 78 * | |
| 79 * * an error is sent, and [cancelOnError] is true, or | |
| 80 * * a done event is sent. | |
| 81 * | |
| 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the | |
| 83 * state is unset, and no furher events must be delivered. | |
| 84 */ | |
| 85 static const int _STATE_WAIT_FOR_CANCEL = 16; | |
| 86 static const int _STATE_IN_CALLBACK = 32; | |
| 87 static const int _STATE_HAS_PENDING = 64; | |
| 88 static const int _STATE_PAUSE_COUNT = 128; | |
| 89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
| 90 | |
| 91 /* Event handlers provided in constructor. */ | |
| 92 _DataHandler<T> _onData; | |
| 93 Function _onError; | |
| 94 _DoneHandler _onDone; | |
| 95 final Zone _zone = Zone.current; | |
| 96 | |
| 97 /** Bit vector based on state-constants above. */ | |
| 98 int _state; | |
| 99 | |
| 100 // TODO(floitsch): reuse another field | |
| 101 /** The future [_onCancel] may return. */ | |
| 102 Future _cancelFuture; | |
| 103 | |
| 104 /** | |
| 105 * Queue of pending events. | |
| 106 * | |
| 107 * Is created when necessary, or set in constructor for preconfigured events. | |
| 108 */ | |
| 109 _PendingEvents<T> _pending; | |
| 110 | |
| 111 _BufferingStreamSubscription(void onData(T data), | |
| 112 Function onError, | |
| 113 void onDone(), | |
| 114 bool cancelOnError) | |
| 115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | |
| 116 this.onData(onData); | |
| 117 this.onError(onError); | |
| 118 this.onDone(onDone); | |
| 119 } | |
| 120 | |
| 121 /** | |
| 122 * Sets the subscription's pending events object. | |
| 123 * | |
| 124 * This can only be done once. The pending events object is used for the | |
| 125 * rest of the subscription's life cycle. | |
| 126 */ | |
| 127 void _setPendingEvents(_PendingEvents<T> pendingEvents) { | |
| 128 assert(_pending == null); | |
| 129 if (pendingEvents == null) return; | |
| 130 _pending = pendingEvents; | |
| 131 if (!pendingEvents.isEmpty) { | |
| 132 _state |= _STATE_HAS_PENDING; | |
| 133 _pending.schedule(this); | |
| 134 } | |
| 135 } | |
| 136 | |
| 137 // StreamSubscription interface. | |
| 138 | |
| 139 void onData(void handleData(T event)) { | |
| 140 if (handleData == null) handleData = _nullDataHandler; | |
| 141 // TODO(floitsch): the return type should be 'void', and the type | |
| 142 // should be inferred. | |
| 143 _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData); | |
| 144 } | |
| 145 | |
| 146 void onError(Function handleError) { | |
| 147 if (handleError == null) handleError = _nullErrorHandler; | |
| 148 _onError = _registerErrorHandler/*<T>*/(handleError, _zone); | |
| 149 } | |
| 150 | |
| 151 void onDone(void handleDone()) { | |
| 152 if (handleDone == null) handleDone = _nullDoneHandler; | |
| 153 _onDone = _zone.registerCallback(handleDone); | |
| 154 } | |
| 155 | |
| 156 void pause([Future resumeSignal]) { | |
| 157 if (_isCanceled) return; | |
| 158 bool wasPaused = _isPaused; | |
| 159 bool wasInputPaused = _isInputPaused; | |
| 160 // Increment pause count and mark input paused (if it isn't already). | |
| 161 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
| 162 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
| 163 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | |
| 164 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); | |
| 165 } | |
| 166 | |
| 167 void resume() { | |
| 168 if (_isCanceled) return; | |
| 169 if (_isPaused) { | |
| 170 _decrementPauseCount(); | |
| 171 if (!_isPaused) { | |
| 172 if (_hasPending && !_pending.isEmpty) { | |
| 173 // Input is still paused. | |
| 174 _pending.schedule(this); | |
| 175 } else { | |
| 176 assert(_mayResumeInput); | |
| 177 _state &= ~_STATE_INPUT_PAUSED; | |
| 178 if (!_inCallback) _guardCallback(_onResume); | |
| 179 } | |
| 180 } | |
| 181 } | |
| 182 } | |
| 183 | |
| 184 Future cancel() { | |
| 185 // The user doesn't want to receive any further events. If there is an | |
| 186 // error or done event pending (waiting for the cancel to be done) discard | |
| 187 // that event. | |
| 188 _state &= ~_STATE_WAIT_FOR_CANCEL; | |
| 189 if (_isCanceled) return _cancelFuture; | |
| 190 _cancel(); | |
| 191 return _cancelFuture; | |
| 192 } | |
| 193 | |
| 194 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | |
| 195 _Future/*<E>*/ result = new _Future/*<E>*/(); | |
| 196 | |
| 197 // Overwrite the onDone and onError handlers. | |
| 198 _onDone = () { result._complete(futureValue); }; | |
| 199 _onError = (error, stackTrace) { | |
| 200 cancel(); | |
| 201 result._completeError(error, stackTrace); | |
| 202 }; | |
| 203 | |
| 204 return result; | |
| 205 } | |
| 206 | |
| 207 // State management. | |
| 208 | |
| 209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | |
| 210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | |
| 211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | |
| 212 bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; | |
| 213 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | |
| 214 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | |
| 215 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | |
| 216 bool get _canFire => _state < _STATE_IN_CALLBACK; | |
| 217 bool get _mayResumeInput => | |
| 218 !_isPaused && (_pending == null || _pending.isEmpty); | |
| 219 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | |
| 220 | |
| 221 bool get isPaused => _isPaused; | |
| 222 | |
| 223 void _cancel() { | |
| 224 _state |= _STATE_CANCELED; | |
| 225 if (_hasPending) { | |
| 226 _pending.cancelSchedule(); | |
| 227 } | |
| 228 if (!_inCallback) _pending = null; | |
| 229 _cancelFuture = _onCancel(); | |
| 230 } | |
| 231 | |
| 232 /** | |
| 233 * Increment the pause count. | |
| 234 * | |
| 235 * Also marks input as paused. | |
| 236 */ | |
| 237 void _incrementPauseCount() { | |
| 238 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
| 239 } | |
| 240 | |
| 241 /** | |
| 242 * Decrements the pause count. | |
| 243 * | |
| 244 * Does not automatically unpause the input (call [_onResume]) when | |
| 245 * the pause count reaches zero. This is handled elsewhere, and only | |
| 246 * if there are no pending events buffered. | |
| 247 */ | |
| 248 void _decrementPauseCount() { | |
| 249 assert(_isPaused); | |
| 250 _state -= _STATE_PAUSE_COUNT; | |
| 251 } | |
| 252 | |
| 253 // _EventSink interface. | |
| 254 | |
| 255 void _add(T data) { | |
| 256 assert(!_isClosed); | |
| 257 if (_isCanceled) return; | |
| 258 if (_canFire) { | |
| 259 _sendData(data); | |
| 260 } else { | |
| 261 _addPending(new _DelayedData<dynamic /*=T*/>(data)); | |
| 262 } | |
| 263 } | |
| 264 | |
| 265 void _addError(Object error, StackTrace stackTrace) { | |
| 266 if (_isCanceled) return; | |
| 267 if (_canFire) { | |
| 268 _sendError(error, stackTrace); // Reports cancel after sending. | |
| 269 } else { | |
| 270 _addPending(new _DelayedError(error, stackTrace)); | |
| 271 } | |
| 272 } | |
| 273 | |
| 274 void _close() { | |
| 275 assert(!_isClosed); | |
| 276 if (_isCanceled) return; | |
| 277 _state |= _STATE_CLOSED; | |
| 278 if (_canFire) { | |
| 279 _sendDone(); | |
| 280 } else { | |
| 281 _addPending(const _DelayedDone()); | |
| 282 } | |
| 283 } | |
| 284 | |
| 285 // Hooks called when the input is paused, unpaused or canceled. | |
| 286 // These must not throw. If overwritten to call user code, include suitable | |
| 287 // try/catch wrapping and send any errors to | |
| 288 // [_Zone.current.handleUncaughtError]. | |
| 289 void _onPause() { | |
| 290 assert(_isInputPaused); | |
| 291 } | |
| 292 | |
| 293 void _onResume() { | |
| 294 assert(!_isInputPaused); | |
| 295 } | |
| 296 | |
| 297 Future _onCancel() { | |
| 298 assert(_isCanceled); | |
| 299 return null; | |
| 300 } | |
| 301 | |
| 302 // Handle pending events. | |
| 303 | |
| 304 /** | |
| 305 * Add a pending event. | |
| 306 * | |
| 307 * If the subscription is not paused, this also schedules a firing | |
| 308 * of pending events later (if necessary). | |
| 309 */ | |
| 310 void _addPending(_DelayedEvent event) { | |
| 311 _StreamImplEvents<T> pending = _pending; | |
| 312 if (_pending == null) { | |
| 313 pending = _pending = new _StreamImplEvents<dynamic /*=T*/>(); | |
| 314 } | |
| 315 pending.add(event); | |
| 316 if (!_hasPending) { | |
| 317 _state |= _STATE_HAS_PENDING; | |
| 318 if (!_isPaused) { | |
| 319 _pending.schedule(this); | |
| 320 } | |
| 321 } | |
| 322 } | |
| 323 | |
| 324 /* _EventDispatch interface. */ | |
| 325 | |
| 326 void _sendData(T data) { | |
| 327 assert(!_isCanceled); | |
| 328 assert(!_isPaused); | |
| 329 assert(!_inCallback); | |
| 330 bool wasInputPaused = _isInputPaused; | |
| 331 _state |= _STATE_IN_CALLBACK; | |
| 332 _zone.runUnaryGuarded(_onData, data); | |
| 333 _state &= ~_STATE_IN_CALLBACK; | |
| 334 _checkState(wasInputPaused); | |
| 335 } | |
| 336 | |
| 337 void _sendError(var error, StackTrace stackTrace) { | |
| 338 assert(!_isCanceled); | |
| 339 assert(!_isPaused); | |
| 340 assert(!_inCallback); | |
| 341 bool wasInputPaused = _isInputPaused; | |
| 342 | |
| 343 void sendError() { | |
| 344 // If the subscription has been canceled while waiting for the cancel | |
| 345 // future to finish we must not report the error. | |
| 346 if (_isCanceled && !_waitsForCancel) return; | |
| 347 _state |= _STATE_IN_CALLBACK; | |
| 348 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { | |
| 349 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError | |
| 350 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | |
| 351 _zone.runBinaryGuarded(errorCallback, error, stackTrace); | |
| 352 } else { | |
| 353 _zone.runUnaryGuarded/*<dynamic, dynamic>*/( | |
| 354 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); | |
| 355 } | |
| 356 _state &= ~_STATE_IN_CALLBACK; | |
| 357 } | |
| 358 | |
| 359 if (_cancelOnError) { | |
| 360 _state |= _STATE_WAIT_FOR_CANCEL; | |
| 361 _cancel(); | |
| 362 if (_cancelFuture is Future) { | |
| 363 _cancelFuture.whenComplete(sendError); | |
| 364 } else { | |
| 365 sendError(); | |
| 366 } | |
| 367 } else { | |
| 368 sendError(); | |
| 369 // Only check state if not cancelOnError. | |
| 370 _checkState(wasInputPaused); | |
| 371 } | |
| 372 } | |
| 373 | |
| 374 void _sendDone() { | |
| 375 assert(!_isCanceled); | |
| 376 assert(!_isPaused); | |
| 377 assert(!_inCallback); | |
| 378 | |
| 379 void sendDone() { | |
| 380 // If the subscription has been canceled while waiting for the cancel | |
| 381 // future to finish we must not report the done event. | |
| 382 if (!_waitsForCancel) return; | |
| 383 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | |
| 384 _zone.runGuarded(_onDone); | |
| 385 _state &= ~_STATE_IN_CALLBACK; | |
| 386 } | |
| 387 | |
| 388 _cancel(); | |
| 389 _state |= _STATE_WAIT_FOR_CANCEL; | |
| 390 if (_cancelFuture is Future) { | |
| 391 _cancelFuture.whenComplete(sendDone); | |
| 392 } else { | |
| 393 sendDone(); | |
| 394 } | |
| 395 } | |
| 396 | |
| 397 /** | |
| 398 * Call a hook function. | |
| 399 * | |
| 400 * The call is properly wrapped in code to avoid other callbacks | |
| 401 * during the call, and it checks for state changes after the call | |
| 402 * that should cause further callbacks. | |
| 403 */ | |
| 404 void _guardCallback(void callback()) { | |
| 405 assert(!_inCallback); | |
| 406 bool wasInputPaused = _isInputPaused; | |
| 407 _state |= _STATE_IN_CALLBACK; | |
| 408 callback(); | |
| 409 _state &= ~_STATE_IN_CALLBACK; | |
| 410 _checkState(wasInputPaused); | |
| 411 } | |
| 412 | |
| 413 /** | |
| 414 * Check if the input needs to be informed of state changes. | |
| 415 * | |
| 416 * State changes are pausing, resuming and canceling. | |
| 417 * | |
| 418 * After canceling, no further callbacks will happen. | |
| 419 * | |
| 420 * The cancel callback is called after a user cancel, or after | |
| 421 * the final done event is sent. | |
| 422 */ | |
| 423 void _checkState(bool wasInputPaused) { | |
| 424 assert(!_inCallback); | |
| 425 if (_hasPending && _pending.isEmpty) { | |
| 426 _state &= ~_STATE_HAS_PENDING; | |
| 427 if (_isInputPaused && _mayResumeInput) { | |
| 428 _state &= ~_STATE_INPUT_PAUSED; | |
| 429 } | |
| 430 } | |
| 431 // If the state changes during a callback, we immediately | |
| 432 // make a new state-change callback. Loop until the state didn't change. | |
| 433 while (true) { | |
| 434 if (_isCanceled) { | |
| 435 _pending = null; | |
| 436 return; | |
| 437 } | |
| 438 bool isInputPaused = _isInputPaused; | |
| 439 if (wasInputPaused == isInputPaused) break; | |
| 440 _state ^= _STATE_IN_CALLBACK; | |
| 441 if (isInputPaused) { | |
| 442 _onPause(); | |
| 443 } else { | |
| 444 _onResume(); | |
| 445 } | |
| 446 _state &= ~_STATE_IN_CALLBACK; | |
| 447 wasInputPaused = isInputPaused; | |
| 448 } | |
| 449 if (_hasPending && !_isPaused) { | |
| 450 _pending.schedule(this); | |
| 451 } | |
| 452 } | |
| 453 } | |
| 454 | |
| 455 // ------------------------------------------------------------------- | |
| 456 // Common base class for single and multi-subscription streams. | |
| 457 // ------------------------------------------------------------------- | |
| 458 abstract class _StreamImpl<T> extends Stream<T> { | |
| 459 // ------------------------------------------------------------------ | |
| 460 // Stream interface. | |
| 461 | |
| 462 StreamSubscription<T> listen(void onData(T data), | |
| 463 { Function onError, | |
| 464 void onDone(), | |
| 465 bool cancelOnError }) { | |
| 466 cancelOnError = identical(true, cancelOnError); | |
| 467 StreamSubscription<T> subscription = | |
| 468 _createSubscription(onData, onError, onDone, cancelOnError); | |
| 469 _onListen(subscription); | |
| 470 return subscription; | |
| 471 } | |
| 472 | |
| 473 // ------------------------------------------------------------------- | |
| 474 /** Create a subscription object. Called by [subcribe]. */ | |
| 475 StreamSubscription<T> _createSubscription( | |
| 476 void onData(T data), | |
| 477 Function onError, | |
| 478 void onDone(), | |
| 479 bool cancelOnError) { | |
| 480 return new _BufferingStreamSubscription<T>(onData, onError, onDone, | |
| 481 cancelOnError); | |
| 482 } | |
| 483 | |
| 484 /** Hook called when the subscription has been created. */ | |
| 485 void _onListen(StreamSubscription subscription) {} | |
| 486 } | |
| 487 | |
| 488 typedef _PendingEvents<T> _EventGenerator<T>(); | |
| 489 | |
| 490 /** Stream that generates its own events. */ | |
| 491 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | |
| 492 final _EventGenerator<T> _pending; | |
| 493 bool _isUsed = false; | |
| 494 /** | |
| 495 * Initializes the stream to have only the events provided by a | |
| 496 * [_PendingEvents]. | |
| 497 * | |
| 498 * A new [_PendingEvents] must be generated for each listen. | |
| 499 */ | |
| 500 _GeneratedStreamImpl(this._pending); | |
| 501 | |
| 502 StreamSubscription<T> _createSubscription( | |
| 503 void onData(T data), | |
| 504 Function onError, | |
| 505 void onDone(), | |
| 506 bool cancelOnError) { | |
| 507 if (_isUsed) throw new StateError("Stream has already been listened to."); | |
| 508 _isUsed = true; | |
| 509 return new _BufferingStreamSubscription<T>( | |
| 510 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); | |
| 511 } | |
| 512 } | |
| 513 | |
| 514 | |
| 515 /** Pending events object that gets its events from an [Iterable]. */ | |
| 516 class _IterablePendingEvents<T> extends _PendingEvents<T> { | |
| 517 // The iterator providing data for data events. | |
| 518 // Set to null when iteration has completed. | |
| 519 Iterator<T> _iterator; | |
| 520 | |
| 521 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | |
| 522 | |
| 523 bool get isEmpty => _iterator == null; | |
| 524 | |
| 525 void handleNext(_EventDispatch<T> dispatch) { | |
| 526 if (_iterator == null) { | |
| 527 throw new StateError("No events pending."); | |
| 528 } | |
| 529 // Send one event per call to moveNext. | |
| 530 // If moveNext returns true, send the current element as data. | |
| 531 // If moveNext returns false, send a done event and clear the _iterator. | |
| 532 // If moveNext throws an error, send an error and clear the _iterator. | |
| 533 // After an error, no further events will be sent. | |
| 534 bool isDone; | |
| 535 try { | |
| 536 isDone = !_iterator.moveNext(); | |
| 537 } catch (e, s) { | |
| 538 _iterator = null; | |
| 539 dispatch._sendError(e, s); | |
| 540 return; | |
| 541 } | |
| 542 if (!isDone) { | |
| 543 dispatch._sendData(_iterator.current); | |
| 544 } else { | |
| 545 _iterator = null; | |
| 546 dispatch._sendDone(); | |
| 547 } | |
| 548 } | |
| 549 | |
| 550 void clear() { | |
| 551 if (isScheduled) cancelSchedule(); | |
| 552 _iterator = null; | |
| 553 } | |
| 554 } | |
| 555 | |
| 556 | |
| 557 // Internal helpers. | |
| 558 | |
| 559 // Types of the different handlers on a stream. Types used to type fields. | |
| 560 typedef void _DataHandler<T>(T value); | |
| 561 typedef void _DoneHandler(); | |
| 562 | |
| 563 | |
| 564 /** Default data handler, does nothing. */ | |
| 565 void _nullDataHandler(var value) {} | |
| 566 | |
| 567 /** Default error handler, reports the error to the current zone's handler. */ | |
| 568 void _nullErrorHandler(error, [StackTrace stackTrace]) { | |
| 569 Zone.current.handleUncaughtError(error, stackTrace); | |
| 570 } | |
| 571 | |
| 572 /** Default done handler, does nothing. */ | |
| 573 void _nullDoneHandler() {} | |
| 574 | |
| 575 | |
| 576 /** A delayed event on a buffering stream subscription. */ | |
| 577 abstract class _DelayedEvent<T> { | |
| 578 /** Added as a linked list on the [StreamController]. */ | |
| 579 _DelayedEvent next; | |
| 580 /** Execute the delayed event on the [StreamController]. */ | |
| 581 void perform(_EventDispatch<T> dispatch); | |
| 582 } | |
| 583 | |
| 584 /** A delayed data event. */ | |
| 585 class _DelayedData<T> extends _DelayedEvent<T> { | |
| 586 final T value; | |
| 587 _DelayedData(this.value); | |
| 588 void perform(_EventDispatch<T> dispatch) { | |
| 589 dispatch._sendData(value); | |
| 590 } | |
| 591 } | |
| 592 | |
| 593 /** A delayed error event. */ | |
| 594 class _DelayedError extends _DelayedEvent { | |
| 595 final error; | |
| 596 final StackTrace stackTrace; | |
| 597 | |
| 598 _DelayedError(this.error, this.stackTrace); | |
| 599 void perform(_EventDispatch dispatch) { | |
| 600 dispatch._sendError(error, stackTrace); | |
| 601 } | |
| 602 } | |
| 603 | |
| 604 /** A delayed done event. */ | |
| 605 class _DelayedDone implements _DelayedEvent { | |
| 606 const _DelayedDone(); | |
| 607 void perform(_EventDispatch dispatch) { | |
| 608 dispatch._sendDone(); | |
| 609 } | |
| 610 | |
| 611 _DelayedEvent get next => null; | |
| 612 | |
| 613 void set next(_DelayedEvent _) { | |
| 614 throw new StateError("No events after a done."); | |
| 615 } | |
| 616 } | |
| 617 | |
| 618 /** Superclass for provider of pending events. */ | |
| 619 abstract class _PendingEvents<T> { | |
| 620 // No async event has been scheduled. | |
| 621 static const int _STATE_UNSCHEDULED = 0; | |
| 622 // An async event has been scheduled to run a function. | |
| 623 static const int _STATE_SCHEDULED = 1; | |
| 624 // An async event has been scheduled, but it will do nothing when it runs. | |
| 625 // Async events can't be preempted. | |
| 626 static const int _STATE_CANCELED = 3; | |
| 627 | |
| 628 /** | |
| 629 * State of being scheduled. | |
| 630 * | |
| 631 * Set to [_STATE_SCHEDULED] when pending events are scheduled for | |
| 632 * async dispatch. Since we can't cancel a [scheduleMicrotask] call, if | |
| 633 * scheduling is "canceled", the _state is simply set to [_STATE_CANCELED] | |
| 634 * which will make the async code do nothing except resetting [_state]. | |
| 635 * | |
| 636 * If events are scheduled while the state is [_STATE_CANCELED], it is | |
| 637 * merely switched back to [_STATE_SCHEDULED], but no new call to | |
| 638 * [scheduleMicrotask] is performed. | |
| 639 */ | |
| 640 int _state = _STATE_UNSCHEDULED; | |
| 641 | |
| 642 bool get isEmpty; | |
| 643 | |
| 644 bool get isScheduled => _state == _STATE_SCHEDULED; | |
| 645 bool get _eventScheduled => _state >= _STATE_SCHEDULED; | |
| 646 | |
| 647 /** | |
| 648 * Schedule an event to run later. | |
| 649 * | |
| 650 * If called more than once, it should be called with the same dispatch as | |
| 651 * argument each time. It may reuse an earlier argument in some cases. | |
| 652 */ | |
| 653 void schedule(_EventDispatch<T> dispatch) { | |
| 654 if (isScheduled) return; | |
| 655 assert(!isEmpty); | |
| 656 if (_eventScheduled) { | |
| 657 assert(_state == _STATE_CANCELED); | |
| 658 _state = _STATE_SCHEDULED; | |
| 659 return; | |
| 660 } | |
| 661 scheduleMicrotask(() { | |
| 662 int oldState = _state; | |
| 663 _state = _STATE_UNSCHEDULED; | |
| 664 if (oldState == _STATE_CANCELED) return; | |
| 665 handleNext(dispatch); | |
| 666 }); | |
| 667 _state = _STATE_SCHEDULED; | |
| 668 } | |
| 669 | |
| 670 void cancelSchedule() { | |
| 671 if (isScheduled) _state = _STATE_CANCELED; | |
| 672 } | |
| 673 | |
| 674 void handleNext(_EventDispatch<T> dispatch); | |
| 675 | |
| 676 /** Throw away any pending events and cancel scheduled events. */ | |
| 677 void clear(); | |
| 678 } | |
| 679 | |
| 680 | |
| 681 /** Class holding pending events for a [_StreamImpl]. */ | |
| 682 class _StreamImplEvents<T> extends _PendingEvents<T> { | |
| 683 /// Single linked list of [_DelayedEvent] objects. | |
| 684 _DelayedEvent firstPendingEvent = null; | |
| 685 /// Last element in the list of pending events. New events are added after it. | |
| 686 _DelayedEvent lastPendingEvent = null; | |
| 687 | |
| 688 bool get isEmpty => lastPendingEvent == null; | |
| 689 | |
| 690 void add(_DelayedEvent event) { | |
| 691 if (lastPendingEvent == null) { | |
| 692 firstPendingEvent = lastPendingEvent = event; | |
| 693 } else { | |
| 694 lastPendingEvent = lastPendingEvent.next = event; | |
| 695 } | |
| 696 } | |
| 697 | |
| 698 void handleNext(_EventDispatch<T> dispatch) { | |
| 699 assert(!isScheduled); | |
| 700 _DelayedEvent event = firstPendingEvent; | |
| 701 firstPendingEvent = event.next; | |
| 702 if (firstPendingEvent == null) { | |
| 703 lastPendingEvent = null; | |
| 704 } | |
| 705 event.perform(dispatch); | |
| 706 } | |
| 707 | |
| 708 void clear() { | |
| 709 if (isScheduled) cancelSchedule(); | |
| 710 firstPendingEvent = lastPendingEvent = null; | |
| 711 } | |
| 712 } | |
| 713 | |
| 714 class _BroadcastLinkedList { | |
| 715 _BroadcastLinkedList _next; | |
| 716 _BroadcastLinkedList _previous; | |
| 717 | |
| 718 void _unlink() { | |
| 719 _previous._next = _next; | |
| 720 _next._previous = _previous; | |
| 721 _next = _previous = this; | |
| 722 } | |
| 723 | |
| 724 void _insertBefore(_BroadcastLinkedList newNext) { | |
| 725 _BroadcastLinkedList newPrevious = newNext._previous; | |
| 726 newPrevious._next = this; | |
| 727 newNext._previous = _previous; | |
| 728 _previous._next = newNext; | |
| 729 _previous = newPrevious; | |
| 730 } | |
| 731 } | |
| 732 | |
| 733 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); | |
| 734 | |
| 735 /** | |
| 736 * Done subscription that will send one done event as soon as possible. | |
| 737 */ | |
| 738 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | |
| 739 static const int _DONE_SENT = 1; | |
| 740 static const int _SCHEDULED = 2; | |
| 741 static const int _PAUSED = 4; | |
| 742 | |
| 743 final Zone _zone; | |
| 744 int _state = 0; | |
| 745 _DoneHandler _onDone; | |
| 746 | |
| 747 _DoneStreamSubscription(this._onDone) : _zone = Zone.current { | |
| 748 _schedule(); | |
| 749 } | |
| 750 | |
| 751 bool get _isSent => (_state & _DONE_SENT) != 0; | |
| 752 bool get _isScheduled => (_state & _SCHEDULED) != 0; | |
| 753 bool get isPaused => _state >= _PAUSED; | |
| 754 | |
| 755 void _schedule() { | |
| 756 if (_isScheduled) return; | |
| 757 _zone.scheduleMicrotask(_sendDone); | |
| 758 _state |= _SCHEDULED; | |
| 759 } | |
| 760 | |
| 761 void onData(void handleData(T data)) {} | |
| 762 void onError(Function handleError) {} | |
| 763 void onDone(void handleDone()) { _onDone = handleDone; } | |
| 764 | |
| 765 void pause([Future resumeSignal]) { | |
| 766 _state += _PAUSED; | |
| 767 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
| 768 } | |
| 769 | |
| 770 void resume() { | |
| 771 if (isPaused) { | |
| 772 _state -= _PAUSED; | |
| 773 if (!isPaused && !_isSent) { | |
| 774 _schedule(); | |
| 775 } | |
| 776 } | |
| 777 } | |
| 778 | |
| 779 Future cancel() => null; | |
| 780 | |
| 781 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | |
| 782 _Future/*<E>*/ result = new _Future/*<E>*/(); | |
| 783 _onDone = () { result._completeWithValue(null); }; | |
| 784 return result; | |
| 785 } | |
| 786 | |
| 787 void _sendDone() { | |
| 788 _state &= ~_SCHEDULED; | |
| 789 if (isPaused) return; | |
| 790 _state |= _DONE_SENT; | |
| 791 if (_onDone != null) _zone.runGuarded(_onDone); | |
| 792 } | |
| 793 } | |
| 794 | |
| 795 class _AsBroadcastStream<T> extends Stream<T> { | |
| 796 final Stream<T> _source; | |
| 797 final _BroadcastCallback<T> _onListenHandler; | |
| 798 final _BroadcastCallback<T> _onCancelHandler; | |
| 799 final Zone _zone; | |
| 800 | |
| 801 _AsBroadcastStreamController<T> _controller; | |
| 802 StreamSubscription<T> _subscription; | |
| 803 | |
| 804 _AsBroadcastStream(this._source, | |
| 805 void onListenHandler(StreamSubscription<T> subscription), | |
| 806 void onCancelHandler(StreamSubscription<T> subscription)) | |
| 807 // TODO(floitsch): the return type should be void and should be | |
| 808 // inferred. | |
| 809 : _onListenHandler = Zone.current.registerUnaryCallback | |
| 810 /*<dynamic, StreamSubscription<T>>*/(onListenHandler), | |
| 811 _onCancelHandler = Zone.current.registerUnaryCallback | |
| 812 /*<dynamic, StreamSubscription<T>>*/(onCancelHandler), | |
| 813 _zone = Zone.current { | |
| 814 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | |
| 815 } | |
| 816 | |
| 817 bool get isBroadcast => true; | |
| 818 | |
| 819 StreamSubscription<T> listen(void onData(T data), | |
| 820 { Function onError, | |
| 821 void onDone(), | |
| 822 bool cancelOnError}) { | |
| 823 if (_controller == null || _controller.isClosed) { | |
| 824 // Return a dummy subscription backed by nothing, since | |
| 825 // it will only ever send one done event. | |
| 826 return new _DoneStreamSubscription<T>(onDone); | |
| 827 } | |
| 828 if (_subscription == null) { | |
| 829 _subscription = _source.listen(_controller.add, | |
| 830 onError: _controller.addError, | |
| 831 onDone: _controller.close); | |
| 832 } | |
| 833 cancelOnError = identical(true, cancelOnError); | |
| 834 return _controller._subscribe(onData, onError, onDone, cancelOnError); | |
| 835 } | |
| 836 | |
| 837 void _onCancel() { | |
| 838 bool shutdown = (_controller == null) || _controller.isClosed; | |
| 839 if (_onCancelHandler != null) { | |
| 840 _zone.runUnary( | |
| 841 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); | |
| 842 } | |
| 843 if (shutdown) { | |
| 844 if (_subscription != null) { | |
| 845 _subscription.cancel(); | |
| 846 _subscription = null; | |
| 847 } | |
| 848 } | |
| 849 } | |
| 850 | |
| 851 void _onListen() { | |
| 852 if (_onListenHandler != null) { | |
| 853 _zone.runUnary( | |
| 854 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); | |
| 855 } | |
| 856 } | |
| 857 | |
| 858 // Methods called from _BroadcastSubscriptionWrapper. | |
| 859 void _cancelSubscription() { | |
| 860 if (_subscription == null) return; | |
| 861 // Called by [_controller] when it has no subscribers left. | |
| 862 StreamSubscription subscription = _subscription; | |
| 863 _subscription = null; | |
| 864 _controller = null; // Marks the stream as no longer listenable. | |
| 865 subscription.cancel(); | |
| 866 } | |
| 867 | |
| 868 void _pauseSubscription(Future resumeSignal) { | |
| 869 if (_subscription == null) return; | |
| 870 _subscription.pause(resumeSignal); | |
| 871 } | |
| 872 | |
| 873 void _resumeSubscription() { | |
| 874 if (_subscription == null) return; | |
| 875 _subscription.resume(); | |
| 876 } | |
| 877 | |
| 878 bool get _isSubscriptionPaused { | |
| 879 if (_subscription == null) return false; | |
| 880 return _subscription.isPaused; | |
| 881 } | |
| 882 } | |
| 883 | |
| 884 /** | |
| 885 * Wrapper for subscription that disallows changing handlers. | |
| 886 */ | |
| 887 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { | |
| 888 final _AsBroadcastStream _stream; | |
| 889 | |
| 890 _BroadcastSubscriptionWrapper(this._stream); | |
| 891 | |
| 892 void onData(void handleData(T data)) { | |
| 893 throw new UnsupportedError( | |
| 894 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 895 } | |
| 896 | |
| 897 void onError(Function handleError) { | |
| 898 throw new UnsupportedError( | |
| 899 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 900 } | |
| 901 | |
| 902 void onDone(void handleDone()) { | |
| 903 throw new UnsupportedError( | |
| 904 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 905 } | |
| 906 | |
| 907 void pause([Future resumeSignal]) { | |
| 908 _stream._pauseSubscription(resumeSignal); | |
| 909 } | |
| 910 | |
| 911 void resume() { | |
| 912 _stream._resumeSubscription(); | |
| 913 } | |
| 914 | |
| 915 Future cancel() { | |
| 916 _stream._cancelSubscription(); | |
| 917 return null; | |
| 918 } | |
| 919 | |
| 920 bool get isPaused { | |
| 921 return _stream._isSubscriptionPaused; | |
| 922 } | |
| 923 | |
| 924 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { | |
| 925 throw new UnsupportedError( | |
| 926 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 927 } | |
| 928 } | |
| 929 | |
| 930 | |
| 931 /** | |
| 932 * Simple implementation of [StreamIterator]. | |
| 933 */ | |
| 934 class _StreamIteratorImpl<T> implements StreamIterator<T> { | |
| 935 // Internal state of the stream iterator. | |
| 936 // At any time, it is in one of these states. | |
| 937 // The interpretation of the [_futureOrPrefecth] field depends on the state. | |
| 938 // In _STATE_MOVING, the _data field holds the most recently returned | |
| 939 // future. | |
| 940 // When in one of the _STATE_EXTRA_* states, the it may hold the | |
| 941 // next data/error object, and the subscription is paused. | |
| 942 | |
| 943 /// The simple state where [_data] holds the data to return, and [moveNext] | |
| 944 /// is allowed. The subscription is actively listening. | |
| 945 static const int _STATE_FOUND = 0; | |
| 946 /// State set after [moveNext] has returned false or an error, | |
| 947 /// or after calling [cancel]. The subscription is always canceled. | |
| 948 static const int _STATE_DONE = 1; | |
| 949 /// State set after calling [moveNext], but before its returned future has | |
| 950 /// completed. Calling [moveNext] again is not allowed in this state. | |
| 951 /// The subscription is actively listening. | |
| 952 static const int _STATE_MOVING = 2; | |
| 953 /// States set when another event occurs while in _STATE_FOUND. | |
| 954 /// This extra overflow event is cached until the next call to [moveNext], | |
| 955 /// which will complete as if it received the event normally. | |
| 956 /// The subscription is paused in these states, so we only ever get one | |
| 957 /// event too many. | |
| 958 static const int _STATE_EXTRA_DATA = 3; | |
| 959 static const int _STATE_EXTRA_ERROR = 4; | |
| 960 static const int _STATE_EXTRA_DONE = 5; | |
| 961 | |
| 962 /// Subscription being listened to. | |
| 963 StreamSubscription _subscription; | |
| 964 | |
| 965 /// The current element represented by the most recent call to moveNext. | |
| 966 /// | |
| 967 /// Is null between the time moveNext is called and its future completes. | |
| 968 T _current = null; | |
| 969 | |
| 970 /// The future returned by the most recent call to [moveNext]. | |
| 971 /// | |
| 972 /// Also used to store the next value/error in case the stream provides an | |
| 973 /// event before [moveNext] is called again. In that case, the stream will | |
| 974 /// be paused to prevent further events. | |
| 975 var/*Future<bool> or T*/ _futureOrPrefetch = null; | |
| 976 | |
| 977 /// The current state. | |
| 978 int _state = _STATE_FOUND; | |
| 979 | |
| 980 _StreamIteratorImpl(final Stream<T> stream) { | |
| 981 _subscription = stream.listen(_onData, | |
| 982 onError: _onError, | |
| 983 onDone: _onDone, | |
| 984 cancelOnError: true); | |
| 985 } | |
| 986 | |
| 987 T get current => _current; | |
| 988 | |
| 989 Future<bool> moveNext() { | |
| 990 if (_state == _STATE_DONE) { | |
| 991 return new _Future<bool>.immediate(false); | |
| 992 } | |
| 993 if (_state == _STATE_MOVING) { | |
| 994 throw new StateError("Already waiting for next."); | |
| 995 } | |
| 996 if (_state == _STATE_FOUND) { | |
| 997 _state = _STATE_MOVING; | |
| 998 _current = null; | |
| 999 var result = new _Future<bool>(); | |
| 1000 _futureOrPrefetch = result; | |
| 1001 return result; | |
| 1002 } else { | |
| 1003 assert(_state >= _STATE_EXTRA_DATA); | |
| 1004 switch (_state) { | |
| 1005 case _STATE_EXTRA_DATA: | |
| 1006 _state = _STATE_FOUND; | |
| 1007 _current = _futureOrPrefetch as Object /*=T*/; | |
| 1008 _futureOrPrefetch = null; | |
| 1009 _subscription.resume(); | |
| 1010 return new _Future<bool>.immediate(true); | |
| 1011 case _STATE_EXTRA_ERROR: | |
| 1012 AsyncError prefetch = _futureOrPrefetch; | |
| 1013 _clear(); | |
| 1014 return new _Future<bool>.immediateError(prefetch.error, | |
| 1015 prefetch.stackTrace); | |
| 1016 case _STATE_EXTRA_DONE: | |
| 1017 _clear(); | |
| 1018 return new _Future<bool>.immediate(false); | |
| 1019 } | |
| 1020 } | |
| 1021 } | |
| 1022 | |
| 1023 /** Clears up the internal state when the iterator ends. */ | |
| 1024 void _clear() { | |
| 1025 _subscription = null; | |
| 1026 _futureOrPrefetch = null; | |
| 1027 _current = null; | |
| 1028 _state = _STATE_DONE; | |
| 1029 } | |
| 1030 | |
| 1031 Future cancel() { | |
| 1032 StreamSubscription subscription = _subscription; | |
| 1033 if (subscription == null) return null; | |
| 1034 if (_state == _STATE_MOVING) { | |
| 1035 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
| 1036 _clear(); | |
| 1037 hasNext._complete(false); | |
| 1038 } else { | |
| 1039 _clear(); | |
| 1040 } | |
| 1041 return subscription.cancel(); | |
| 1042 } | |
| 1043 | |
| 1044 void _onData(T data) { | |
| 1045 if (_state == _STATE_MOVING) { | |
| 1046 _current = data; | |
| 1047 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
| 1048 _futureOrPrefetch = null; | |
| 1049 _state = _STATE_FOUND; | |
| 1050 hasNext._complete(true); | |
| 1051 return; | |
| 1052 } | |
| 1053 _subscription.pause(); | |
| 1054 assert(_futureOrPrefetch == null); | |
| 1055 _futureOrPrefetch = data; | |
| 1056 _state = _STATE_EXTRA_DATA; | |
| 1057 } | |
| 1058 | |
| 1059 void _onError(Object error, [StackTrace stackTrace]) { | |
| 1060 if (_state == _STATE_MOVING) { | |
| 1061 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
| 1062 // We have cancelOnError: true, so the subscription is canceled. | |
| 1063 _clear(); | |
| 1064 hasNext._completeError(error, stackTrace); | |
| 1065 return; | |
| 1066 } | |
| 1067 _subscription.pause(); | |
| 1068 assert(_futureOrPrefetch == null); | |
| 1069 _futureOrPrefetch = new AsyncError(error, stackTrace); | |
| 1070 _state = _STATE_EXTRA_ERROR; | |
| 1071 } | |
| 1072 | |
| 1073 void _onDone() { | |
| 1074 if (_state == _STATE_MOVING) { | |
| 1075 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; | |
| 1076 _clear(); | |
| 1077 hasNext._complete(false); | |
| 1078 return; | |
| 1079 } | |
| 1080 _subscription.pause(); | |
| 1081 _futureOrPrefetch = null; | |
| 1082 _state = _STATE_EXTRA_DONE; | |
| 1083 } | |
| 1084 } | |
| 1085 | |
| 1086 /** An empty broadcast stream, sending a done event as soon as possible. */ | |
| 1087 class _EmptyStream<T> extends Stream<T> { | |
| 1088 const _EmptyStream() : super._internal(); | |
| 1089 bool get isBroadcast => true; | |
| 1090 StreamSubscription<T> listen(void onData(T data), | |
| 1091 {Function onError, | |
| 1092 void onDone(), | |
| 1093 bool cancelOnError}) { | |
| 1094 return new _DoneStreamSubscription<T>(onDone); | |
| 1095 } | |
| 1096 } | |
| OLD | NEW |