OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
99 | 99 |
100 // TODO(floitsch): reuse another field | 100 // TODO(floitsch): reuse another field |
101 /** The future [_onCancel] may return. */ | 101 /** The future [_onCancel] may return. */ |
102 Future _cancelFuture; | 102 Future _cancelFuture; |
103 | 103 |
104 /** | 104 /** |
105 * Queue of pending events. | 105 * Queue of pending events. |
106 * | 106 * |
107 * Is created when necessary, or set in constructor for preconfigured events. | 107 * Is created when necessary, or set in constructor for preconfigured events. |
108 */ | 108 */ |
109 _PendingEvents _pending; | 109 _PendingEvents<T> _pending; |
110 | 110 |
111 _BufferingStreamSubscription(void onData(T data), | 111 _BufferingStreamSubscription(void onData(T data), |
112 Function onError, | 112 Function onError, |
113 void onDone(), | 113 void onDone(), |
114 bool cancelOnError) | 114 bool cancelOnError) |
115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
116 this.onData(onData); | 116 this.onData(onData); |
117 this.onError(onError); | 117 this.onError(onError); |
118 this.onDone(onDone); | 118 this.onDone(onDone); |
119 } | 119 } |
120 | 120 |
121 /** | 121 /** |
122 * Sets the subscription's pending events object. | 122 * Sets the subscription's pending events object. |
123 * | 123 * |
124 * This can only be done once. The pending events object is used for the | 124 * This can only be done once. The pending events object is used for the |
125 * rest of the subscription's life cycle. | 125 * rest of the subscription's life cycle. |
126 */ | 126 */ |
127 void _setPendingEvents(_PendingEvents pendingEvents) { | 127 void _setPendingEvents(_PendingEvents<T> pendingEvents) { |
128 assert(_pending == null); | 128 assert(_pending == null); |
129 if (pendingEvents == null) return; | 129 if (pendingEvents == null) return; |
130 _pending = pendingEvents; | 130 _pending = pendingEvents; |
131 if (!pendingEvents.isEmpty) { | 131 if (!pendingEvents.isEmpty) { |
132 _state |= _STATE_HAS_PENDING; | 132 _state |= _STATE_HAS_PENDING; |
133 _pending.schedule(this); | 133 _pending.schedule(this); |
134 } | 134 } |
135 } | 135 } |
136 | 136 |
137 /** | |
138 * Extracts the pending events from a canceled stream. | |
139 * | |
140 * This can only be done during the [_onCancel] method call. After that, | |
141 * any remaining pending events will be cleared. | |
142 */ | |
143 _PendingEvents _extractPending() { | |
144 assert(_isCanceled); | |
145 _PendingEvents events = _pending; | |
146 _pending = null; | |
147 return events; | |
148 } | |
149 | |
150 // StreamSubscription interface. | 137 // StreamSubscription interface. |
151 | 138 |
152 void onData(void handleData(T event)) { | 139 void onData(void handleData(T event)) { |
153 if (handleData == null) handleData = _nullDataHandler; | 140 if (handleData == null) handleData = _nullDataHandler; |
154 _onData = _zone.registerUnaryCallback(handleData); | 141 // TODO(floitsch): the return type should be 'void', and the type |
| 142 // should be inferred. |
| 143 _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData); |
155 } | 144 } |
156 | 145 |
157 void onError(Function handleError) { | 146 void onError(Function handleError) { |
158 if (handleError == null) handleError = _nullErrorHandler; | 147 if (handleError == null) handleError = _nullErrorHandler; |
159 _onError = _registerErrorHandler(handleError, _zone); | 148 _onError = _registerErrorHandler/*<T>*/(handleError, _zone); |
160 } | 149 } |
161 | 150 |
162 void onDone(void handleDone()) { | 151 void onDone(void handleDone()) { |
163 if (handleDone == null) handleDone = _nullDoneHandler; | 152 if (handleDone == null) handleDone = _nullDoneHandler; |
164 _onDone = _zone.registerCallback(handleDone); | 153 _onDone = _zone.registerCallback(handleDone); |
165 } | 154 } |
166 | 155 |
167 void pause([Future resumeSignal]) { | 156 void pause([Future resumeSignal]) { |
168 if (_isCanceled) return; | 157 if (_isCanceled) return; |
169 bool wasPaused = _isPaused; | 158 bool wasPaused = _isPaused; |
(...skipping 25 matching lines...) Expand all Loading... |
195 Future cancel() { | 184 Future cancel() { |
196 // The user doesn't want to receive any further events. If there is an | 185 // The user doesn't want to receive any further events. If there is an |
197 // error or done event pending (waiting for the cancel to be done) discard | 186 // error or done event pending (waiting for the cancel to be done) discard |
198 // that event. | 187 // that event. |
199 _state &= ~_STATE_WAIT_FOR_CANCEL; | 188 _state &= ~_STATE_WAIT_FOR_CANCEL; |
200 if (_isCanceled) return _cancelFuture; | 189 if (_isCanceled) return _cancelFuture; |
201 _cancel(); | 190 _cancel(); |
202 return _cancelFuture; | 191 return _cancelFuture; |
203 } | 192 } |
204 | 193 |
205 Future asFuture([var futureValue]) { | 194 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
206 _Future<T> result = new _Future<T>(); | 195 _Future/*<E>*/ result = new _Future/*<E>*/(); |
207 | 196 |
208 // Overwrite the onDone and onError handlers. | 197 // Overwrite the onDone and onError handlers. |
209 _onDone = () { result._complete(futureValue); }; | 198 _onDone = () { result._complete(futureValue); }; |
210 _onError = (error, stackTrace) { | 199 _onError = (error, stackTrace) { |
211 cancel(); | 200 cancel(); |
212 result._completeError(error, stackTrace); | 201 result._completeError(error, stackTrace); |
213 }; | 202 }; |
214 | 203 |
215 return result; | 204 return result; |
216 } | 205 } |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
262 } | 251 } |
263 | 252 |
264 // _EventSink interface. | 253 // _EventSink interface. |
265 | 254 |
266 void _add(T data) { | 255 void _add(T data) { |
267 assert(!_isClosed); | 256 assert(!_isClosed); |
268 if (_isCanceled) return; | 257 if (_isCanceled) return; |
269 if (_canFire) { | 258 if (_canFire) { |
270 _sendData(data); | 259 _sendData(data); |
271 } else { | 260 } else { |
272 _addPending(new _DelayedData(data)); | 261 _addPending(new _DelayedData<dynamic /*=T*/>(data)); |
273 } | 262 } |
274 } | 263 } |
275 | 264 |
276 void _addError(Object error, StackTrace stackTrace) { | 265 void _addError(Object error, StackTrace stackTrace) { |
277 if (_isCanceled) return; | 266 if (_isCanceled) return; |
278 if (_canFire) { | 267 if (_canFire) { |
279 _sendError(error, stackTrace); // Reports cancel after sending. | 268 _sendError(error, stackTrace); // Reports cancel after sending. |
280 } else { | 269 } else { |
281 _addPending(new _DelayedError(error, stackTrace)); | 270 _addPending(new _DelayedError(error, stackTrace)); |
282 } | 271 } |
(...skipping 29 matching lines...) Expand all Loading... |
312 | 301 |
313 // Handle pending events. | 302 // Handle pending events. |
314 | 303 |
315 /** | 304 /** |
316 * Add a pending event. | 305 * Add a pending event. |
317 * | 306 * |
318 * If the subscription is not paused, this also schedules a firing | 307 * If the subscription is not paused, this also schedules a firing |
319 * of pending events later (if necessary). | 308 * of pending events later (if necessary). |
320 */ | 309 */ |
321 void _addPending(_DelayedEvent event) { | 310 void _addPending(_DelayedEvent event) { |
322 _StreamImplEvents pending = _pending; | 311 _StreamImplEvents<T> pending = _pending; |
323 if (_pending == null) pending = _pending = new _StreamImplEvents(); | 312 if (_pending == null) { |
| 313 pending = _pending = new _StreamImplEvents<dynamic /*=T*/>(); |
| 314 } |
324 pending.add(event); | 315 pending.add(event); |
325 if (!_hasPending) { | 316 if (!_hasPending) { |
326 _state |= _STATE_HAS_PENDING; | 317 _state |= _STATE_HAS_PENDING; |
327 if (!_isPaused) { | 318 if (!_isPaused) { |
328 _pending.schedule(this); | 319 _pending.schedule(this); |
329 } | 320 } |
330 } | 321 } |
331 } | 322 } |
332 | 323 |
333 /* _EventDispatch interface. */ | 324 /* _EventDispatch interface. */ |
334 | 325 |
335 void _sendData(T data) { | 326 void _sendData(T data) { |
336 assert(!_isCanceled); | 327 assert(!_isCanceled); |
337 assert(!_isPaused); | 328 assert(!_isPaused); |
338 assert(!_inCallback); | 329 assert(!_inCallback); |
339 bool wasInputPaused = _isInputPaused; | 330 bool wasInputPaused = _isInputPaused; |
340 _state |= _STATE_IN_CALLBACK; | 331 _state |= _STATE_IN_CALLBACK; |
341 _zone.runUnaryGuarded(_onData, data); | 332 _zone.runUnaryGuarded(_onData, data); |
342 _state &= ~_STATE_IN_CALLBACK; | 333 _state &= ~_STATE_IN_CALLBACK; |
343 _checkState(wasInputPaused); | 334 _checkState(wasInputPaused); |
344 } | 335 } |
345 | 336 |
346 void _sendError(Object error, StackTrace stackTrace) { | 337 void _sendError(var error, StackTrace stackTrace) { |
347 assert(!_isCanceled); | 338 assert(!_isCanceled); |
348 assert(!_isPaused); | 339 assert(!_isPaused); |
349 assert(!_inCallback); | 340 assert(!_inCallback); |
350 bool wasInputPaused = _isInputPaused; | 341 bool wasInputPaused = _isInputPaused; |
351 | 342 |
352 void sendError() { | 343 void sendError() { |
353 // If the subscription has been canceled while waiting for the cancel | 344 // If the subscription has been canceled while waiting for the cancel |
354 // future to finish we must not report the error. | 345 // future to finish we must not report the error. |
355 if (_isCanceled && !_waitsForCancel) return; | 346 if (_isCanceled && !_waitsForCancel) return; |
356 _state |= _STATE_IN_CALLBACK; | 347 _state |= _STATE_IN_CALLBACK; |
357 if (_onError is ZoneBinaryCallback) { | 348 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
358 _zone.runBinaryGuarded(_onError, error, stackTrace); | 349 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError |
| 350 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
| 351 _zone.runBinaryGuarded(errorCallback, error, stackTrace); |
359 } else { | 352 } else { |
360 _zone.runUnaryGuarded(_onError, error); | 353 _zone.runUnaryGuarded/*<dynamic, dynamic>*/( |
| 354 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); |
361 } | 355 } |
362 _state &= ~_STATE_IN_CALLBACK; | 356 _state &= ~_STATE_IN_CALLBACK; |
363 } | 357 } |
364 | 358 |
365 if (_cancelOnError) { | 359 if (_cancelOnError) { |
366 _state |= _STATE_WAIT_FOR_CANCEL; | 360 _state |= _STATE_WAIT_FOR_CANCEL; |
367 _cancel(); | 361 _cancel(); |
368 if (_cancelFuture is Future) { | 362 if (_cancelFuture is Future) { |
369 _cancelFuture.whenComplete(sendError); | 363 _cancelFuture.whenComplete(sendError); |
370 } else { | 364 } else { |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
463 // ------------------------------------------------------------------- | 457 // ------------------------------------------------------------------- |
464 abstract class _StreamImpl<T> extends Stream<T> { | 458 abstract class _StreamImpl<T> extends Stream<T> { |
465 // ------------------------------------------------------------------ | 459 // ------------------------------------------------------------------ |
466 // Stream interface. | 460 // Stream interface. |
467 | 461 |
468 StreamSubscription<T> listen(void onData(T data), | 462 StreamSubscription<T> listen(void onData(T data), |
469 { Function onError, | 463 { Function onError, |
470 void onDone(), | 464 void onDone(), |
471 bool cancelOnError }) { | 465 bool cancelOnError }) { |
472 cancelOnError = identical(true, cancelOnError); | 466 cancelOnError = identical(true, cancelOnError); |
473 StreamSubscription subscription = | 467 StreamSubscription<T> subscription = |
474 _createSubscription(onData, onError, onDone, cancelOnError); | 468 _createSubscription(onData, onError, onDone, cancelOnError); |
475 _onListen(subscription); | 469 _onListen(subscription); |
476 return subscription; | 470 return subscription; |
477 } | 471 } |
478 | 472 |
479 // ------------------------------------------------------------------- | 473 // ------------------------------------------------------------------- |
480 /** Create a subscription object. Called by [subcribe]. */ | 474 /** Create a subscription object. Called by [subcribe]. */ |
481 StreamSubscription<T> _createSubscription( | 475 StreamSubscription<T> _createSubscription( |
482 void onData(T data), | 476 void onData(T data), |
483 Function onError, | 477 Function onError, |
484 void onDone(), | 478 void onDone(), |
485 bool cancelOnError) { | 479 bool cancelOnError) { |
486 return new _BufferingStreamSubscription<T>(onData, onError, onDone, | 480 return new _BufferingStreamSubscription<T>(onData, onError, onDone, |
487 cancelOnError); | 481 cancelOnError); |
488 } | 482 } |
489 | 483 |
490 /** Hook called when the subscription has been created. */ | 484 /** Hook called when the subscription has been created. */ |
491 void _onListen(StreamSubscription subscription) {} | 485 void _onListen(StreamSubscription subscription) {} |
492 } | 486 } |
493 | 487 |
494 typedef _PendingEvents _EventGenerator(); | 488 typedef _PendingEvents<T> _EventGenerator<T>(); |
495 | 489 |
496 /** Stream that generates its own events. */ | 490 /** Stream that generates its own events. */ |
497 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 491 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
498 final _EventGenerator _pending; | 492 final _EventGenerator<T> _pending; |
499 bool _isUsed = false; | 493 bool _isUsed = false; |
500 /** | 494 /** |
501 * Initializes the stream to have only the events provided by a | 495 * Initializes the stream to have only the events provided by a |
502 * [_PendingEvents]. | 496 * [_PendingEvents]. |
503 * | 497 * |
504 * A new [_PendingEvents] must be generated for each listen. | 498 * A new [_PendingEvents] must be generated for each listen. |
505 */ | 499 */ |
506 _GeneratedStreamImpl(this._pending); | 500 _GeneratedStreamImpl(this._pending); |
507 | 501 |
508 StreamSubscription<T> _createSubscription( | 502 StreamSubscription<T> _createSubscription( |
509 void onData(T data), | 503 void onData(T data), |
510 Function onError, | 504 Function onError, |
511 void onDone(), | 505 void onDone(), |
512 bool cancelOnError) { | 506 bool cancelOnError) { |
513 if (_isUsed) throw new StateError("Stream has already been listened to."); | 507 if (_isUsed) throw new StateError("Stream has already been listened to."); |
514 _isUsed = true; | 508 _isUsed = true; |
515 return new _BufferingStreamSubscription( | 509 return new _BufferingStreamSubscription<T>( |
516 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); | 510 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); |
517 } | 511 } |
518 } | 512 } |
519 | 513 |
520 | 514 |
521 /** Pending events object that gets its events from an [Iterable]. */ | 515 /** Pending events object that gets its events from an [Iterable]. */ |
522 class _IterablePendingEvents<T> extends _PendingEvents { | 516 class _IterablePendingEvents<T> extends _PendingEvents<T> { |
523 // The iterator providing data for data events. | 517 // The iterator providing data for data events. |
524 // Set to null when iteration has completed. | 518 // Set to null when iteration has completed. |
525 Iterator<T> _iterator; | 519 Iterator<T> _iterator; |
526 | 520 |
527 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | 521 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
528 | 522 |
529 bool get isEmpty => _iterator == null; | 523 bool get isEmpty => _iterator == null; |
530 | 524 |
531 void handleNext(_EventDispatch dispatch) { | 525 void handleNext(_EventDispatch<T> dispatch) { |
532 if (_iterator == null) { | 526 if (_iterator == null) { |
533 throw new StateError("No events pending."); | 527 throw new StateError("No events pending."); |
534 } | 528 } |
535 // Send one event per call to moveNext. | 529 // Send one event per call to moveNext. |
536 // If moveNext returns true, send the current element as data. | 530 // If moveNext returns true, send the current element as data. |
537 // If moveNext returns false, send a done event and clear the _iterator. | 531 // If moveNext returns false, send a done event and clear the _iterator. |
538 // If moveNext throws an error, send an error and clear the _iterator. | 532 // If moveNext throws an error, send an error and clear the _iterator. |
539 // After an error, no further events will be sent. | 533 // After an error, no further events will be sent. |
540 bool isDone; | 534 bool isDone; |
541 try { | 535 try { |
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
615 } | 609 } |
616 | 610 |
617 _DelayedEvent get next => null; | 611 _DelayedEvent get next => null; |
618 | 612 |
619 void set next(_DelayedEvent _) { | 613 void set next(_DelayedEvent _) { |
620 throw new StateError("No events after a done."); | 614 throw new StateError("No events after a done."); |
621 } | 615 } |
622 } | 616 } |
623 | 617 |
624 /** Superclass for provider of pending events. */ | 618 /** Superclass for provider of pending events. */ |
625 abstract class _PendingEvents { | 619 abstract class _PendingEvents<T> { |
626 // No async event has been scheduled. | 620 // No async event has been scheduled. |
627 static const int _STATE_UNSCHEDULED = 0; | 621 static const int _STATE_UNSCHEDULED = 0; |
628 // An async event has been scheduled to run a function. | 622 // An async event has been scheduled to run a function. |
629 static const int _STATE_SCHEDULED = 1; | 623 static const int _STATE_SCHEDULED = 1; |
630 // An async event has been scheduled, but it will do nothing when it runs. | 624 // An async event has been scheduled, but it will do nothing when it runs. |
631 // Async events can't be preempted. | 625 // Async events can't be preempted. |
632 static const int _STATE_CANCELED = 3; | 626 static const int _STATE_CANCELED = 3; |
633 | 627 |
634 /** | 628 /** |
635 * State of being scheduled. | 629 * State of being scheduled. |
(...skipping 13 matching lines...) Expand all Loading... |
649 | 643 |
650 bool get isScheduled => _state == _STATE_SCHEDULED; | 644 bool get isScheduled => _state == _STATE_SCHEDULED; |
651 bool get _eventScheduled => _state >= _STATE_SCHEDULED; | 645 bool get _eventScheduled => _state >= _STATE_SCHEDULED; |
652 | 646 |
653 /** | 647 /** |
654 * Schedule an event to run later. | 648 * Schedule an event to run later. |
655 * | 649 * |
656 * If called more than once, it should be called with the same dispatch as | 650 * If called more than once, it should be called with the same dispatch as |
657 * argument each time. It may reuse an earlier argument in some cases. | 651 * argument each time. It may reuse an earlier argument in some cases. |
658 */ | 652 */ |
659 void schedule(_EventDispatch dispatch) { | 653 void schedule(_EventDispatch<T> dispatch) { |
660 if (isScheduled) return; | 654 if (isScheduled) return; |
661 assert(!isEmpty); | 655 assert(!isEmpty); |
662 if (_eventScheduled) { | 656 if (_eventScheduled) { |
663 assert(_state == _STATE_CANCELED); | 657 assert(_state == _STATE_CANCELED); |
664 _state = _STATE_SCHEDULED; | 658 _state = _STATE_SCHEDULED; |
665 return; | 659 return; |
666 } | 660 } |
667 scheduleMicrotask(() { | 661 scheduleMicrotask(() { |
668 int oldState = _state; | 662 int oldState = _state; |
669 _state = _STATE_UNSCHEDULED; | 663 _state = _STATE_UNSCHEDULED; |
670 if (oldState == _STATE_CANCELED) return; | 664 if (oldState == _STATE_CANCELED) return; |
671 handleNext(dispatch); | 665 handleNext(dispatch); |
672 }); | 666 }); |
673 _state = _STATE_SCHEDULED; | 667 _state = _STATE_SCHEDULED; |
674 } | 668 } |
675 | 669 |
676 void cancelSchedule() { | 670 void cancelSchedule() { |
677 if (isScheduled) _state = _STATE_CANCELED; | 671 if (isScheduled) _state = _STATE_CANCELED; |
678 } | 672 } |
679 | 673 |
680 void handleNext(_EventDispatch dispatch); | 674 void handleNext(_EventDispatch<T> dispatch); |
681 | 675 |
682 /** Throw away any pending events and cancel scheduled events. */ | 676 /** Throw away any pending events and cancel scheduled events. */ |
683 void clear(); | 677 void clear(); |
684 } | 678 } |
685 | 679 |
686 | 680 |
687 /** Class holding pending events for a [_StreamImpl]. */ | 681 /** Class holding pending events for a [_StreamImpl]. */ |
688 class _StreamImplEvents extends _PendingEvents { | 682 class _StreamImplEvents<T> extends _PendingEvents<T> { |
689 /// Single linked list of [_DelayedEvent] objects. | 683 /// Single linked list of [_DelayedEvent] objects. |
690 _DelayedEvent firstPendingEvent = null; | 684 _DelayedEvent firstPendingEvent = null; |
691 /// Last element in the list of pending events. New events are added after it. | 685 /// Last element in the list of pending events. New events are added after it. |
692 _DelayedEvent lastPendingEvent = null; | 686 _DelayedEvent lastPendingEvent = null; |
693 | 687 |
694 bool get isEmpty => lastPendingEvent == null; | 688 bool get isEmpty => lastPendingEvent == null; |
695 | 689 |
696 void add(_DelayedEvent event) { | 690 void add(_DelayedEvent event) { |
697 if (lastPendingEvent == null) { | 691 if (lastPendingEvent == null) { |
698 firstPendingEvent = lastPendingEvent = event; | 692 firstPendingEvent = lastPendingEvent = event; |
699 } else { | 693 } else { |
700 lastPendingEvent = lastPendingEvent.next = event; | 694 lastPendingEvent = lastPendingEvent.next = event; |
701 } | 695 } |
702 } | 696 } |
703 | 697 |
704 void handleNext(_EventDispatch dispatch) { | 698 void handleNext(_EventDispatch<T> dispatch) { |
705 assert(!isScheduled); | 699 assert(!isScheduled); |
706 _DelayedEvent event = firstPendingEvent; | 700 _DelayedEvent event = firstPendingEvent; |
707 firstPendingEvent = event.next; | 701 firstPendingEvent = event.next; |
708 if (firstPendingEvent == null) { | 702 if (firstPendingEvent == null) { |
709 lastPendingEvent = null; | 703 lastPendingEvent = null; |
710 } | 704 } |
711 event.perform(dispatch); | 705 event.perform(dispatch); |
712 } | 706 } |
713 | 707 |
714 void clear() { | 708 void clear() { |
(...skipping 14 matching lines...) Expand all Loading... |
729 | 723 |
730 void _insertBefore(_BroadcastLinkedList newNext) { | 724 void _insertBefore(_BroadcastLinkedList newNext) { |
731 _BroadcastLinkedList newPrevious = newNext._previous; | 725 _BroadcastLinkedList newPrevious = newNext._previous; |
732 newPrevious._next = this; | 726 newPrevious._next = this; |
733 newNext._previous = _previous; | 727 newNext._previous = _previous; |
734 _previous._next = newNext; | 728 _previous._next = newNext; |
735 _previous = newPrevious; | 729 _previous = newPrevious; |
736 } | 730 } |
737 } | 731 } |
738 | 732 |
739 typedef void _broadcastCallback(StreamSubscription subscription); | 733 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
740 | 734 |
741 /** | 735 /** |
742 * Done subscription that will send one done event as soon as possible. | 736 * Done subscription that will send one done event as soon as possible. |
743 */ | 737 */ |
744 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 738 class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
745 static const int _DONE_SENT = 1; | 739 static const int _DONE_SENT = 1; |
746 static const int _SCHEDULED = 2; | 740 static const int _SCHEDULED = 2; |
747 static const int _PAUSED = 4; | 741 static const int _PAUSED = 4; |
748 | 742 |
749 final Zone _zone; | 743 final Zone _zone; |
(...skipping 27 matching lines...) Expand all Loading... |
777 if (isPaused) { | 771 if (isPaused) { |
778 _state -= _PAUSED; | 772 _state -= _PAUSED; |
779 if (!isPaused && !_isSent) { | 773 if (!isPaused && !_isSent) { |
780 _schedule(); | 774 _schedule(); |
781 } | 775 } |
782 } | 776 } |
783 } | 777 } |
784 | 778 |
785 Future cancel() => null; | 779 Future cancel() => null; |
786 | 780 |
787 Future asFuture([futureValue]) { | 781 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
788 _Future result = new _Future(); | 782 _Future/*<E>*/ result = new _Future/*<E>*/(); |
789 _onDone = () { result._completeWithValue(null); }; | 783 _onDone = () { result._completeWithValue(null); }; |
790 return result; | 784 return result; |
791 } | 785 } |
792 | 786 |
793 void _sendDone() { | 787 void _sendDone() { |
794 _state &= ~_SCHEDULED; | 788 _state &= ~_SCHEDULED; |
795 if (isPaused) return; | 789 if (isPaused) return; |
796 _state |= _DONE_SENT; | 790 _state |= _DONE_SENT; |
797 if (_onDone != null) _zone.runGuarded(_onDone); | 791 if (_onDone != null) _zone.runGuarded(_onDone); |
798 } | 792 } |
799 } | 793 } |
800 | 794 |
801 class _AsBroadcastStream<T> extends Stream<T> { | 795 class _AsBroadcastStream<T> extends Stream<T> { |
802 final Stream<T> _source; | 796 final Stream<T> _source; |
803 final _broadcastCallback _onListenHandler; | 797 final _BroadcastCallback<T> _onListenHandler; |
804 final _broadcastCallback _onCancelHandler; | 798 final _BroadcastCallback<T> _onCancelHandler; |
805 final Zone _zone; | 799 final Zone _zone; |
806 | 800 |
807 _AsBroadcastStreamController<T> _controller; | 801 _AsBroadcastStreamController<T> _controller; |
808 StreamSubscription<T> _subscription; | 802 StreamSubscription<T> _subscription; |
809 | 803 |
810 _AsBroadcastStream(this._source, | 804 _AsBroadcastStream(this._source, |
811 void onListenHandler(StreamSubscription subscription), | 805 void onListenHandler(StreamSubscription<T> subscription), |
812 void onCancelHandler(StreamSubscription subscription)) | 806 void onCancelHandler(StreamSubscription<T> subscription)) |
813 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), | 807 // TODO(floitsch): the return type should be void and should be |
814 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), | 808 // inferred. |
| 809 : _onListenHandler = Zone.current.registerUnaryCallback |
| 810 /*<dynamic, StreamSubscription<T>>*/(onListenHandler), |
| 811 _onCancelHandler = Zone.current.registerUnaryCallback |
| 812 /*<dynamic, StreamSubscription<T>>*/(onCancelHandler), |
815 _zone = Zone.current { | 813 _zone = Zone.current { |
816 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 814 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
817 } | 815 } |
818 | 816 |
819 bool get isBroadcast => true; | 817 bool get isBroadcast => true; |
820 | 818 |
821 StreamSubscription<T> listen(void onData(T data), | 819 StreamSubscription<T> listen(void onData(T data), |
822 { Function onError, | 820 { Function onError, |
823 void onDone(), | 821 void onDone(), |
824 bool cancelOnError}) { | 822 bool cancelOnError}) { |
825 if (_controller == null || _controller.isClosed) { | 823 if (_controller == null || _controller.isClosed) { |
826 // Return a dummy subscription backed by nothing, since | 824 // Return a dummy subscription backed by nothing, since |
827 // it will only ever send one done event. | 825 // it will only ever send one done event. |
828 return new _DoneStreamSubscription<T>(onDone); | 826 return new _DoneStreamSubscription<T>(onDone); |
829 } | 827 } |
830 if (_subscription == null) { | 828 if (_subscription == null) { |
831 _subscription = _source.listen(_controller.add, | 829 _subscription = _source.listen(_controller.add, |
832 onError: _controller.addError, | 830 onError: _controller.addError, |
833 onDone: _controller.close); | 831 onDone: _controller.close); |
834 } | 832 } |
835 cancelOnError = identical(true, cancelOnError); | 833 cancelOnError = identical(true, cancelOnError); |
836 return _controller._subscribe(onData, onError, onDone, cancelOnError); | 834 return _controller._subscribe(onData, onError, onDone, cancelOnError); |
837 } | 835 } |
838 | 836 |
839 void _onCancel() { | 837 void _onCancel() { |
840 bool shutdown = (_controller == null) || _controller.isClosed; | 838 bool shutdown = (_controller == null) || _controller.isClosed; |
841 if (_onCancelHandler != null) { | 839 if (_onCancelHandler != null) { |
842 _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this)); | 840 _zone.runUnary( |
| 841 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
843 } | 842 } |
844 if (shutdown) { | 843 if (shutdown) { |
845 if (_subscription != null) { | 844 if (_subscription != null) { |
846 _subscription.cancel(); | 845 _subscription.cancel(); |
847 _subscription = null; | 846 _subscription = null; |
848 } | 847 } |
849 } | 848 } |
850 } | 849 } |
851 | 850 |
852 void _onListen() { | 851 void _onListen() { |
853 if (_onListenHandler != null) { | 852 if (_onListenHandler != null) { |
854 _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this)); | 853 _zone.runUnary( |
| 854 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
855 } | 855 } |
856 } | 856 } |
857 | 857 |
858 // Methods called from _BroadcastSubscriptionWrapper. | 858 // Methods called from _BroadcastSubscriptionWrapper. |
859 void _cancelSubscription() { | 859 void _cancelSubscription() { |
860 if (_subscription == null) return; | 860 if (_subscription == null) return; |
861 // Called by [_controller] when it has no subscribers left. | 861 // Called by [_controller] when it has no subscribers left. |
862 StreamSubscription subscription = _subscription; | 862 StreamSubscription subscription = _subscription; |
863 _subscription = null; | 863 _subscription = null; |
864 _controller = null; // Marks the stream as no longer listenable. | 864 _controller = null; // Marks the stream as no longer listenable. |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
914 | 914 |
915 Future cancel() { | 915 Future cancel() { |
916 _stream._cancelSubscription(); | 916 _stream._cancelSubscription(); |
917 return null; | 917 return null; |
918 } | 918 } |
919 | 919 |
920 bool get isPaused { | 920 bool get isPaused { |
921 return _stream._isSubscriptionPaused; | 921 return _stream._isSubscriptionPaused; |
922 } | 922 } |
923 | 923 |
924 Future asFuture([var futureValue]) { | 924 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
925 throw new UnsupportedError( | 925 throw new UnsupportedError( |
926 "Cannot change handlers of asBroadcastStream source subscription."); | 926 "Cannot change handlers of asBroadcastStream source subscription."); |
927 } | 927 } |
928 } | 928 } |
929 | 929 |
930 | 930 |
931 /** | 931 /** |
932 * Simple implementation of [StreamIterator]. | 932 * Simple implementation of [StreamIterator]. |
933 */ | 933 */ |
934 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 934 class _StreamIteratorImpl<T> implements StreamIterator<T> { |
(...skipping 30 matching lines...) Expand all Loading... |
965 /// The current element represented by the most recent call to moveNext. | 965 /// The current element represented by the most recent call to moveNext. |
966 /// | 966 /// |
967 /// Is null between the time moveNext is called and its future completes. | 967 /// Is null between the time moveNext is called and its future completes. |
968 T _current = null; | 968 T _current = null; |
969 | 969 |
970 /// The future returned by the most recent call to [moveNext]. | 970 /// The future returned by the most recent call to [moveNext]. |
971 /// | 971 /// |
972 /// Also used to store the next value/error in case the stream provides an | 972 /// Also used to store the next value/error in case the stream provides an |
973 /// event before [moveNext] is called again. In that case, the stream will | 973 /// event before [moveNext] is called again. In that case, the stream will |
974 /// be paused to prevent further events. | 974 /// be paused to prevent further events. |
975 var _futureOrPrefetch = null; | 975 var/*Future<bool> or T*/ _futureOrPrefetch = null; |
976 | 976 |
977 /// The current state. | 977 /// The current state. |
978 int _state = _STATE_FOUND; | 978 int _state = _STATE_FOUND; |
979 | 979 |
980 _StreamIteratorImpl(final Stream<T> stream) { | 980 _StreamIteratorImpl(final Stream<T> stream) { |
981 _subscription = stream.listen(_onData, | 981 _subscription = stream.listen(_onData, |
982 onError: _onError, | 982 onError: _onError, |
983 onDone: _onDone, | 983 onDone: _onDone, |
984 cancelOnError: true); | 984 cancelOnError: true); |
985 } | 985 } |
986 | 986 |
987 T get current => _current; | 987 T get current => _current; |
988 | 988 |
989 Future<bool> moveNext() { | 989 Future<bool> moveNext() { |
990 if (_state == _STATE_DONE) { | 990 if (_state == _STATE_DONE) { |
991 return new _Future<bool>.immediate(false); | 991 return new _Future<bool>.immediate(false); |
992 } | 992 } |
993 if (_state == _STATE_MOVING) { | 993 if (_state == _STATE_MOVING) { |
994 throw new StateError("Already waiting for next."); | 994 throw new StateError("Already waiting for next."); |
995 } | 995 } |
996 if (_state == _STATE_FOUND) { | 996 if (_state == _STATE_FOUND) { |
997 _state = _STATE_MOVING; | 997 _state = _STATE_MOVING; |
998 _current = null; | 998 _current = null; |
999 _futureOrPrefetch = new _Future<bool>(); | 999 var result = new _Future<bool>(); |
1000 return _futureOrPrefetch; | 1000 _futureOrPrefetch = result; |
| 1001 return result; |
1001 } else { | 1002 } else { |
1002 assert(_state >= _STATE_EXTRA_DATA); | 1003 assert(_state >= _STATE_EXTRA_DATA); |
1003 switch (_state) { | 1004 switch (_state) { |
1004 case _STATE_EXTRA_DATA: | 1005 case _STATE_EXTRA_DATA: |
1005 _state = _STATE_FOUND; | 1006 _state = _STATE_FOUND; |
1006 _current = _futureOrPrefetch; | 1007 _current = _futureOrPrefetch as Object /*=T*/; |
1007 _futureOrPrefetch = null; | 1008 _futureOrPrefetch = null; |
1008 _subscription.resume(); | 1009 _subscription.resume(); |
1009 return new _Future<bool>.immediate(true); | 1010 return new _Future<bool>.immediate(true); |
1010 case _STATE_EXTRA_ERROR: | 1011 case _STATE_EXTRA_ERROR: |
1011 AsyncError prefetch = _futureOrPrefetch; | 1012 AsyncError prefetch = _futureOrPrefetch; |
1012 _clear(); | 1013 _clear(); |
1013 return new _Future<bool>.immediateError(prefetch.error, | 1014 return new _Future<bool>.immediateError(prefetch.error, |
1014 prefetch.stackTrace); | 1015 prefetch.stackTrace); |
1015 case _STATE_EXTRA_DONE: | 1016 case _STATE_EXTRA_DONE: |
1016 _clear(); | 1017 _clear(); |
1017 return new _Future<bool>.immediate(false); | 1018 return new _Future<bool>.immediate(false); |
1018 } | 1019 } |
1019 } | 1020 } |
1020 } | 1021 } |
1021 | 1022 |
1022 /** Clears up the internal state when the iterator ends. */ | 1023 /** Clears up the internal state when the iterator ends. */ |
1023 void _clear() { | 1024 void _clear() { |
1024 _subscription = null; | 1025 _subscription = null; |
1025 _futureOrPrefetch = null; | 1026 _futureOrPrefetch = null; |
1026 _current = null; | 1027 _current = null; |
1027 _state = _STATE_DONE; | 1028 _state = _STATE_DONE; |
1028 } | 1029 } |
1029 | 1030 |
1030 Future cancel() { | 1031 Future cancel() { |
1031 StreamSubscription subscription = _subscription; | 1032 StreamSubscription subscription = _subscription; |
1032 // Cherry pick of: https://codereview.chromium.org//896793002 | |
1033 if (subscription == null) return null; | 1033 if (subscription == null) return null; |
1034 if (_state == _STATE_MOVING) { | 1034 if (_state == _STATE_MOVING) { |
1035 _Future<bool> hasNext = _futureOrPrefetch; | 1035 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
1036 _clear(); | 1036 _clear(); |
1037 hasNext._complete(false); | 1037 hasNext._complete(false); |
1038 } else { | 1038 } else { |
1039 _clear(); | 1039 _clear(); |
1040 } | 1040 } |
1041 return subscription.cancel(); | 1041 return subscription.cancel(); |
1042 } | 1042 } |
1043 | 1043 |
1044 void _onData(T data) { | 1044 void _onData(T data) { |
1045 if (_state == _STATE_MOVING) { | 1045 if (_state == _STATE_MOVING) { |
1046 _current = data; | 1046 _current = data; |
1047 _Future<bool> hasNext = _futureOrPrefetch; | 1047 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
1048 _futureOrPrefetch = null; | 1048 _futureOrPrefetch = null; |
1049 _state = _STATE_FOUND; | 1049 _state = _STATE_FOUND; |
1050 hasNext._complete(true); | 1050 hasNext._complete(true); |
1051 return; | 1051 return; |
1052 } | 1052 } |
1053 _subscription.pause(); | 1053 _subscription.pause(); |
1054 assert(_futureOrPrefetch == null); | 1054 assert(_futureOrPrefetch == null); |
1055 _futureOrPrefetch = data; | 1055 _futureOrPrefetch = data; |
1056 _state = _STATE_EXTRA_DATA; | 1056 _state = _STATE_EXTRA_DATA; |
1057 } | 1057 } |
1058 | 1058 |
1059 void _onError(Object error, [StackTrace stackTrace]) { | 1059 void _onError(Object error, [StackTrace stackTrace]) { |
1060 if (_state == _STATE_MOVING) { | 1060 if (_state == _STATE_MOVING) { |
1061 _Future<bool> hasNext = _futureOrPrefetch; | 1061 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
1062 // We have cancelOnError: true, so the subscription is canceled. | 1062 // We have cancelOnError: true, so the subscription is canceled. |
1063 _clear(); | 1063 _clear(); |
1064 hasNext._completeError(error, stackTrace); | 1064 hasNext._completeError(error, stackTrace); |
1065 return; | 1065 return; |
1066 } | 1066 } |
1067 _subscription.pause(); | 1067 _subscription.pause(); |
1068 assert(_futureOrPrefetch == null); | 1068 assert(_futureOrPrefetch == null); |
1069 _futureOrPrefetch = new AsyncError(error, stackTrace); | 1069 _futureOrPrefetch = new AsyncError(error, stackTrace); |
1070 _state = _STATE_EXTRA_ERROR; | 1070 _state = _STATE_EXTRA_ERROR; |
1071 } | 1071 } |
1072 | 1072 |
1073 void _onDone() { | 1073 void _onDone() { |
1074 if (_state == _STATE_MOVING) { | 1074 if (_state == _STATE_MOVING) { |
1075 _Future<bool> hasNext = _futureOrPrefetch; | 1075 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
1076 _clear(); | 1076 _clear(); |
1077 hasNext._complete(false); | 1077 hasNext._complete(false); |
1078 return; | 1078 return; |
1079 } | 1079 } |
1080 _subscription.pause(); | 1080 _subscription.pause(); |
1081 _futureOrPrefetch = null; | 1081 _futureOrPrefetch = null; |
1082 _state = _STATE_EXTRA_DONE; | 1082 _state = _STATE_EXTRA_DONE; |
1083 } | 1083 } |
1084 } | 1084 } |
| 1085 |
| 1086 /** An empty broadcast stream, sending a done event as soon as possible. */ |
| 1087 class _EmptyStream<T> extends Stream<T> { |
| 1088 const _EmptyStream() : super._internal(); |
| 1089 bool get isBroadcast => true; |
| 1090 StreamSubscription<T> listen(void onData(T data), |
| 1091 {Function onError, |
| 1092 void onDone(), |
| 1093 bool cancelOnError}) { |
| 1094 return new _DoneStreamSubscription<T>(onDone); |
| 1095 } |
| 1096 } |
OLD | NEW |