Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: tool/input_sdk/lib/async/stream_impl.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Remove unneeded calls. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 99
100 // TODO(floitsch): reuse another field 100 // TODO(floitsch): reuse another field
101 /** The future [_onCancel] may return. */ 101 /** The future [_onCancel] may return. */
102 Future _cancelFuture; 102 Future _cancelFuture;
103 103
104 /** 104 /**
105 * Queue of pending events. 105 * Queue of pending events.
106 * 106 *
107 * Is created when necessary, or set in constructor for preconfigured events. 107 * Is created when necessary, or set in constructor for preconfigured events.
108 */ 108 */
109 _PendingEvents _pending; 109 _PendingEvents<T> _pending;
110 110
111 _BufferingStreamSubscription(void onData(T data), 111 _BufferingStreamSubscription(void onData(T data),
112 Function onError, 112 Function onError,
113 void onDone(), 113 void onDone(),
114 bool cancelOnError) 114 bool cancelOnError)
115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { 115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
116 this.onData(onData); 116 this.onData(onData);
117 this.onError(onError); 117 this.onError(onError);
118 this.onDone(onDone); 118 this.onDone(onDone);
119 } 119 }
120 120
121 /** 121 /**
122 * Sets the subscription's pending events object. 122 * Sets the subscription's pending events object.
123 * 123 *
124 * This can only be done once. The pending events object is used for the 124 * This can only be done once. The pending events object is used for the
125 * rest of the subscription's life cycle. 125 * rest of the subscription's life cycle.
126 */ 126 */
127 void _setPendingEvents(_PendingEvents pendingEvents) { 127 void _setPendingEvents(_PendingEvents<T> pendingEvents) {
128 assert(_pending == null); 128 assert(_pending == null);
129 if (pendingEvents == null) return; 129 if (pendingEvents == null) return;
130 _pending = pendingEvents; 130 _pending = pendingEvents;
131 if (!pendingEvents.isEmpty) { 131 if (!pendingEvents.isEmpty) {
132 _state |= _STATE_HAS_PENDING; 132 _state |= _STATE_HAS_PENDING;
133 _pending.schedule(this); 133 _pending.schedule(this);
134 } 134 }
135 } 135 }
136 136
137 /**
138 * Extracts the pending events from a canceled stream.
139 *
140 * This can only be done during the [_onCancel] method call. After that,
141 * any remaining pending events will be cleared.
142 */
143 _PendingEvents _extractPending() {
144 assert(_isCanceled);
145 _PendingEvents events = _pending;
146 _pending = null;
147 return events;
148 }
149
150 // StreamSubscription interface. 137 // StreamSubscription interface.
151 138
152 void onData(void handleData(T event)) { 139 void onData(void handleData(T event)) {
153 if (handleData == null) handleData = _nullDataHandler; 140 if (handleData == null) handleData = _nullDataHandler;
154 _onData = _zone.registerUnaryCallback(handleData); 141 // TODO(floitsch): the return type should be 'void', and the type
142 // should be inferred.
143 _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData);
155 } 144 }
156 145
157 void onError(Function handleError) { 146 void onError(Function handleError) {
158 if (handleError == null) handleError = _nullErrorHandler; 147 if (handleError == null) handleError = _nullErrorHandler;
159 _onError = _registerErrorHandler(handleError, _zone); 148 _onError = _registerErrorHandler/*<T>*/(handleError, _zone);
160 } 149 }
161 150
162 void onDone(void handleDone()) { 151 void onDone(void handleDone()) {
163 if (handleDone == null) handleDone = _nullDoneHandler; 152 if (handleDone == null) handleDone = _nullDoneHandler;
164 _onDone = _zone.registerCallback(handleDone); 153 _onDone = _zone.registerCallback(handleDone);
165 } 154 }
166 155
167 void pause([Future resumeSignal]) { 156 void pause([Future resumeSignal]) {
168 if (_isCanceled) return; 157 if (_isCanceled) return;
169 bool wasPaused = _isPaused; 158 bool wasPaused = _isPaused;
(...skipping 25 matching lines...) Expand all
195 Future cancel() { 184 Future cancel() {
196 // The user doesn't want to receive any further events. If there is an 185 // The user doesn't want to receive any further events. If there is an
197 // error or done event pending (waiting for the cancel to be done) discard 186 // error or done event pending (waiting for the cancel to be done) discard
198 // that event. 187 // that event.
199 _state &= ~_STATE_WAIT_FOR_CANCEL; 188 _state &= ~_STATE_WAIT_FOR_CANCEL;
200 if (_isCanceled) return _cancelFuture; 189 if (_isCanceled) return _cancelFuture;
201 _cancel(); 190 _cancel();
202 return _cancelFuture; 191 return _cancelFuture;
203 } 192 }
204 193
205 Future asFuture([var futureValue]) { 194 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
206 _Future<T> result = new _Future<T>(); 195 _Future/*<E>*/ result = new _Future/*<E>*/();
207 196
208 // Overwrite the onDone and onError handlers. 197 // Overwrite the onDone and onError handlers.
209 _onDone = () { result._complete(futureValue); }; 198 _onDone = () { result._complete(futureValue); };
210 _onError = (error, stackTrace) { 199 _onError = (error, stackTrace) {
211 cancel(); 200 cancel();
212 result._completeError(error, stackTrace); 201 result._completeError(error, stackTrace);
213 }; 202 };
214 203
215 return result; 204 return result;
216 } 205 }
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
262 } 251 }
263 252
264 // _EventSink interface. 253 // _EventSink interface.
265 254
266 void _add(T data) { 255 void _add(T data) {
267 assert(!_isClosed); 256 assert(!_isClosed);
268 if (_isCanceled) return; 257 if (_isCanceled) return;
269 if (_canFire) { 258 if (_canFire) {
270 _sendData(data); 259 _sendData(data);
271 } else { 260 } else {
272 _addPending(new _DelayedData(data)); 261 _addPending(new _DelayedData<dynamic /*=T*/>(data));
273 } 262 }
274 } 263 }
275 264
276 void _addError(Object error, StackTrace stackTrace) { 265 void _addError(Object error, StackTrace stackTrace) {
277 if (_isCanceled) return; 266 if (_isCanceled) return;
278 if (_canFire) { 267 if (_canFire) {
279 _sendError(error, stackTrace); // Reports cancel after sending. 268 _sendError(error, stackTrace); // Reports cancel after sending.
280 } else { 269 } else {
281 _addPending(new _DelayedError(error, stackTrace)); 270 _addPending(new _DelayedError(error, stackTrace));
282 } 271 }
(...skipping 29 matching lines...) Expand all
312 301
313 // Handle pending events. 302 // Handle pending events.
314 303
315 /** 304 /**
316 * Add a pending event. 305 * Add a pending event.
317 * 306 *
318 * If the subscription is not paused, this also schedules a firing 307 * If the subscription is not paused, this also schedules a firing
319 * of pending events later (if necessary). 308 * of pending events later (if necessary).
320 */ 309 */
321 void _addPending(_DelayedEvent event) { 310 void _addPending(_DelayedEvent event) {
322 _StreamImplEvents pending = _pending; 311 _StreamImplEvents<T> pending = _pending;
323 if (_pending == null) pending = _pending = new _StreamImplEvents(); 312 if (_pending == null) {
313 pending = _pending = new _StreamImplEvents<dynamic /*=T*/>();
314 }
324 pending.add(event); 315 pending.add(event);
325 if (!_hasPending) { 316 if (!_hasPending) {
326 _state |= _STATE_HAS_PENDING; 317 _state |= _STATE_HAS_PENDING;
327 if (!_isPaused) { 318 if (!_isPaused) {
328 _pending.schedule(this); 319 _pending.schedule(this);
329 } 320 }
330 } 321 }
331 } 322 }
332 323
333 /* _EventDispatch interface. */ 324 /* _EventDispatch interface. */
334 325
335 void _sendData(T data) { 326 void _sendData(T data) {
336 assert(!_isCanceled); 327 assert(!_isCanceled);
337 assert(!_isPaused); 328 assert(!_isPaused);
338 assert(!_inCallback); 329 assert(!_inCallback);
339 bool wasInputPaused = _isInputPaused; 330 bool wasInputPaused = _isInputPaused;
340 _state |= _STATE_IN_CALLBACK; 331 _state |= _STATE_IN_CALLBACK;
341 _zone.runUnaryGuarded(_onData, data); 332 _zone.runUnaryGuarded(_onData, data);
342 _state &= ~_STATE_IN_CALLBACK; 333 _state &= ~_STATE_IN_CALLBACK;
343 _checkState(wasInputPaused); 334 _checkState(wasInputPaused);
344 } 335 }
345 336
346 void _sendError(Object error, StackTrace stackTrace) { 337 void _sendError(var error, StackTrace stackTrace) {
347 assert(!_isCanceled); 338 assert(!_isCanceled);
348 assert(!_isPaused); 339 assert(!_isPaused);
349 assert(!_inCallback); 340 assert(!_inCallback);
350 bool wasInputPaused = _isInputPaused; 341 bool wasInputPaused = _isInputPaused;
351 342
352 void sendError() { 343 void sendError() {
353 // If the subscription has been canceled while waiting for the cancel 344 // If the subscription has been canceled while waiting for the cancel
354 // future to finish we must not report the error. 345 // future to finish we must not report the error.
355 if (_isCanceled && !_waitsForCancel) return; 346 if (_isCanceled && !_waitsForCancel) return;
356 _state |= _STATE_IN_CALLBACK; 347 _state |= _STATE_IN_CALLBACK;
357 if (_onError is ZoneBinaryCallback) { 348 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
358 _zone.runBinaryGuarded(_onError, error, stackTrace); 349 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
350 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
351 _zone.runBinaryGuarded(errorCallback, error, stackTrace);
359 } else { 352 } else {
360 _zone.runUnaryGuarded(_onError, error); 353 _zone.runUnaryGuarded/*<dynamic, dynamic>*/(
354 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
361 } 355 }
362 _state &= ~_STATE_IN_CALLBACK; 356 _state &= ~_STATE_IN_CALLBACK;
363 } 357 }
364 358
365 if (_cancelOnError) { 359 if (_cancelOnError) {
366 _state |= _STATE_WAIT_FOR_CANCEL; 360 _state |= _STATE_WAIT_FOR_CANCEL;
367 _cancel(); 361 _cancel();
368 if (_cancelFuture is Future) { 362 if (_cancelFuture is Future) {
369 _cancelFuture.whenComplete(sendError); 363 _cancelFuture.whenComplete(sendError);
370 } else { 364 } else {
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
463 // ------------------------------------------------------------------- 457 // -------------------------------------------------------------------
464 abstract class _StreamImpl<T> extends Stream<T> { 458 abstract class _StreamImpl<T> extends Stream<T> {
465 // ------------------------------------------------------------------ 459 // ------------------------------------------------------------------
466 // Stream interface. 460 // Stream interface.
467 461
468 StreamSubscription<T> listen(void onData(T data), 462 StreamSubscription<T> listen(void onData(T data),
469 { Function onError, 463 { Function onError,
470 void onDone(), 464 void onDone(),
471 bool cancelOnError }) { 465 bool cancelOnError }) {
472 cancelOnError = identical(true, cancelOnError); 466 cancelOnError = identical(true, cancelOnError);
473 StreamSubscription subscription = 467 StreamSubscription<T> subscription =
474 _createSubscription(onData, onError, onDone, cancelOnError); 468 _createSubscription(onData, onError, onDone, cancelOnError);
475 _onListen(subscription); 469 _onListen(subscription);
476 return subscription; 470 return subscription;
477 } 471 }
478 472
479 // ------------------------------------------------------------------- 473 // -------------------------------------------------------------------
480 /** Create a subscription object. Called by [subcribe]. */ 474 /** Create a subscription object. Called by [subcribe]. */
481 StreamSubscription<T> _createSubscription( 475 StreamSubscription<T> _createSubscription(
482 void onData(T data), 476 void onData(T data),
483 Function onError, 477 Function onError,
484 void onDone(), 478 void onDone(),
485 bool cancelOnError) { 479 bool cancelOnError) {
486 return new _BufferingStreamSubscription<T>(onData, onError, onDone, 480 return new _BufferingStreamSubscription<T>(onData, onError, onDone,
487 cancelOnError); 481 cancelOnError);
488 } 482 }
489 483
490 /** Hook called when the subscription has been created. */ 484 /** Hook called when the subscription has been created. */
491 void _onListen(StreamSubscription subscription) {} 485 void _onListen(StreamSubscription subscription) {}
492 } 486 }
493 487
494 typedef _PendingEvents _EventGenerator(); 488 typedef _PendingEvents<T> _EventGenerator<T>();
495 489
496 /** Stream that generates its own events. */ 490 /** Stream that generates its own events. */
497 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { 491 class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
498 final _EventGenerator _pending; 492 final _EventGenerator<T> _pending;
499 bool _isUsed = false; 493 bool _isUsed = false;
500 /** 494 /**
501 * Initializes the stream to have only the events provided by a 495 * Initializes the stream to have only the events provided by a
502 * [_PendingEvents]. 496 * [_PendingEvents].
503 * 497 *
504 * A new [_PendingEvents] must be generated for each listen. 498 * A new [_PendingEvents] must be generated for each listen.
505 */ 499 */
506 _GeneratedStreamImpl(this._pending); 500 _GeneratedStreamImpl(this._pending);
507 501
508 StreamSubscription<T> _createSubscription( 502 StreamSubscription<T> _createSubscription(
509 void onData(T data), 503 void onData(T data),
510 Function onError, 504 Function onError,
511 void onDone(), 505 void onDone(),
512 bool cancelOnError) { 506 bool cancelOnError) {
513 if (_isUsed) throw new StateError("Stream has already been listened to."); 507 if (_isUsed) throw new StateError("Stream has already been listened to.");
514 _isUsed = true; 508 _isUsed = true;
515 return new _BufferingStreamSubscription( 509 return new _BufferingStreamSubscription<T>(
516 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); 510 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending());
517 } 511 }
518 } 512 }
519 513
520 514
521 /** Pending events object that gets its events from an [Iterable]. */ 515 /** Pending events object that gets its events from an [Iterable]. */
522 class _IterablePendingEvents<T> extends _PendingEvents { 516 class _IterablePendingEvents<T> extends _PendingEvents<T> {
523 // The iterator providing data for data events. 517 // The iterator providing data for data events.
524 // Set to null when iteration has completed. 518 // Set to null when iteration has completed.
525 Iterator<T> _iterator; 519 Iterator<T> _iterator;
526 520
527 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; 521 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
528 522
529 bool get isEmpty => _iterator == null; 523 bool get isEmpty => _iterator == null;
530 524
531 void handleNext(_EventDispatch dispatch) { 525 void handleNext(_EventDispatch<T> dispatch) {
532 if (_iterator == null) { 526 if (_iterator == null) {
533 throw new StateError("No events pending."); 527 throw new StateError("No events pending.");
534 } 528 }
535 // Send one event per call to moveNext. 529 // Send one event per call to moveNext.
536 // If moveNext returns true, send the current element as data. 530 // If moveNext returns true, send the current element as data.
537 // If moveNext returns false, send a done event and clear the _iterator. 531 // If moveNext returns false, send a done event and clear the _iterator.
538 // If moveNext throws an error, send an error and clear the _iterator. 532 // If moveNext throws an error, send an error and clear the _iterator.
539 // After an error, no further events will be sent. 533 // After an error, no further events will be sent.
540 bool isDone; 534 bool isDone;
541 try { 535 try {
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
615 } 609 }
616 610
617 _DelayedEvent get next => null; 611 _DelayedEvent get next => null;
618 612
619 void set next(_DelayedEvent _) { 613 void set next(_DelayedEvent _) {
620 throw new StateError("No events after a done."); 614 throw new StateError("No events after a done.");
621 } 615 }
622 } 616 }
623 617
624 /** Superclass for provider of pending events. */ 618 /** Superclass for provider of pending events. */
625 abstract class _PendingEvents { 619 abstract class _PendingEvents<T> {
626 // No async event has been scheduled. 620 // No async event has been scheduled.
627 static const int _STATE_UNSCHEDULED = 0; 621 static const int _STATE_UNSCHEDULED = 0;
628 // An async event has been scheduled to run a function. 622 // An async event has been scheduled to run a function.
629 static const int _STATE_SCHEDULED = 1; 623 static const int _STATE_SCHEDULED = 1;
630 // An async event has been scheduled, but it will do nothing when it runs. 624 // An async event has been scheduled, but it will do nothing when it runs.
631 // Async events can't be preempted. 625 // Async events can't be preempted.
632 static const int _STATE_CANCELED = 3; 626 static const int _STATE_CANCELED = 3;
633 627
634 /** 628 /**
635 * State of being scheduled. 629 * State of being scheduled.
(...skipping 13 matching lines...) Expand all
649 643
650 bool get isScheduled => _state == _STATE_SCHEDULED; 644 bool get isScheduled => _state == _STATE_SCHEDULED;
651 bool get _eventScheduled => _state >= _STATE_SCHEDULED; 645 bool get _eventScheduled => _state >= _STATE_SCHEDULED;
652 646
653 /** 647 /**
654 * Schedule an event to run later. 648 * Schedule an event to run later.
655 * 649 *
656 * If called more than once, it should be called with the same dispatch as 650 * If called more than once, it should be called with the same dispatch as
657 * argument each time. It may reuse an earlier argument in some cases. 651 * argument each time. It may reuse an earlier argument in some cases.
658 */ 652 */
659 void schedule(_EventDispatch dispatch) { 653 void schedule(_EventDispatch<T> dispatch) {
660 if (isScheduled) return; 654 if (isScheduled) return;
661 assert(!isEmpty); 655 assert(!isEmpty);
662 if (_eventScheduled) { 656 if (_eventScheduled) {
663 assert(_state == _STATE_CANCELED); 657 assert(_state == _STATE_CANCELED);
664 _state = _STATE_SCHEDULED; 658 _state = _STATE_SCHEDULED;
665 return; 659 return;
666 } 660 }
667 scheduleMicrotask(() { 661 scheduleMicrotask(() {
668 int oldState = _state; 662 int oldState = _state;
669 _state = _STATE_UNSCHEDULED; 663 _state = _STATE_UNSCHEDULED;
670 if (oldState == _STATE_CANCELED) return; 664 if (oldState == _STATE_CANCELED) return;
671 handleNext(dispatch); 665 handleNext(dispatch);
672 }); 666 });
673 _state = _STATE_SCHEDULED; 667 _state = _STATE_SCHEDULED;
674 } 668 }
675 669
676 void cancelSchedule() { 670 void cancelSchedule() {
677 if (isScheduled) _state = _STATE_CANCELED; 671 if (isScheduled) _state = _STATE_CANCELED;
678 } 672 }
679 673
680 void handleNext(_EventDispatch dispatch); 674 void handleNext(_EventDispatch<T> dispatch);
681 675
682 /** Throw away any pending events and cancel scheduled events. */ 676 /** Throw away any pending events and cancel scheduled events. */
683 void clear(); 677 void clear();
684 } 678 }
685 679
686 680
687 /** Class holding pending events for a [_StreamImpl]. */ 681 /** Class holding pending events for a [_StreamImpl]. */
688 class _StreamImplEvents extends _PendingEvents { 682 class _StreamImplEvents<T> extends _PendingEvents<T> {
689 /// Single linked list of [_DelayedEvent] objects. 683 /// Single linked list of [_DelayedEvent] objects.
690 _DelayedEvent firstPendingEvent = null; 684 _DelayedEvent firstPendingEvent = null;
691 /// Last element in the list of pending events. New events are added after it. 685 /// Last element in the list of pending events. New events are added after it.
692 _DelayedEvent lastPendingEvent = null; 686 _DelayedEvent lastPendingEvent = null;
693 687
694 bool get isEmpty => lastPendingEvent == null; 688 bool get isEmpty => lastPendingEvent == null;
695 689
696 void add(_DelayedEvent event) { 690 void add(_DelayedEvent event) {
697 if (lastPendingEvent == null) { 691 if (lastPendingEvent == null) {
698 firstPendingEvent = lastPendingEvent = event; 692 firstPendingEvent = lastPendingEvent = event;
699 } else { 693 } else {
700 lastPendingEvent = lastPendingEvent.next = event; 694 lastPendingEvent = lastPendingEvent.next = event;
701 } 695 }
702 } 696 }
703 697
704 void handleNext(_EventDispatch dispatch) { 698 void handleNext(_EventDispatch<T> dispatch) {
705 assert(!isScheduled); 699 assert(!isScheduled);
706 _DelayedEvent event = firstPendingEvent; 700 _DelayedEvent event = firstPendingEvent;
707 firstPendingEvent = event.next; 701 firstPendingEvent = event.next;
708 if (firstPendingEvent == null) { 702 if (firstPendingEvent == null) {
709 lastPendingEvent = null; 703 lastPendingEvent = null;
710 } 704 }
711 event.perform(dispatch); 705 event.perform(dispatch);
712 } 706 }
713 707
714 void clear() { 708 void clear() {
(...skipping 14 matching lines...) Expand all
729 723
730 void _insertBefore(_BroadcastLinkedList newNext) { 724 void _insertBefore(_BroadcastLinkedList newNext) {
731 _BroadcastLinkedList newPrevious = newNext._previous; 725 _BroadcastLinkedList newPrevious = newNext._previous;
732 newPrevious._next = this; 726 newPrevious._next = this;
733 newNext._previous = _previous; 727 newNext._previous = _previous;
734 _previous._next = newNext; 728 _previous._next = newNext;
735 _previous = newPrevious; 729 _previous = newPrevious;
736 } 730 }
737 } 731 }
738 732
739 typedef void _broadcastCallback(StreamSubscription subscription); 733 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
740 734
741 /** 735 /**
742 * Done subscription that will send one done event as soon as possible. 736 * Done subscription that will send one done event as soon as possible.
743 */ 737 */
744 class _DoneStreamSubscription<T> implements StreamSubscription<T> { 738 class _DoneStreamSubscription<T> implements StreamSubscription<T> {
745 static const int _DONE_SENT = 1; 739 static const int _DONE_SENT = 1;
746 static const int _SCHEDULED = 2; 740 static const int _SCHEDULED = 2;
747 static const int _PAUSED = 4; 741 static const int _PAUSED = 4;
748 742
749 final Zone _zone; 743 final Zone _zone;
(...skipping 27 matching lines...) Expand all
777 if (isPaused) { 771 if (isPaused) {
778 _state -= _PAUSED; 772 _state -= _PAUSED;
779 if (!isPaused && !_isSent) { 773 if (!isPaused && !_isSent) {
780 _schedule(); 774 _schedule();
781 } 775 }
782 } 776 }
783 } 777 }
784 778
785 Future cancel() => null; 779 Future cancel() => null;
786 780
787 Future asFuture([futureValue]) { 781 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
788 _Future result = new _Future(); 782 _Future/*<E>*/ result = new _Future/*<E>*/();
789 _onDone = () { result._completeWithValue(null); }; 783 _onDone = () { result._completeWithValue(null); };
790 return result; 784 return result;
791 } 785 }
792 786
793 void _sendDone() { 787 void _sendDone() {
794 _state &= ~_SCHEDULED; 788 _state &= ~_SCHEDULED;
795 if (isPaused) return; 789 if (isPaused) return;
796 _state |= _DONE_SENT; 790 _state |= _DONE_SENT;
797 if (_onDone != null) _zone.runGuarded(_onDone); 791 if (_onDone != null) _zone.runGuarded(_onDone);
798 } 792 }
799 } 793 }
800 794
801 class _AsBroadcastStream<T> extends Stream<T> { 795 class _AsBroadcastStream<T> extends Stream<T> {
802 final Stream<T> _source; 796 final Stream<T> _source;
803 final _broadcastCallback _onListenHandler; 797 final _BroadcastCallback<T> _onListenHandler;
804 final _broadcastCallback _onCancelHandler; 798 final _BroadcastCallback<T> _onCancelHandler;
805 final Zone _zone; 799 final Zone _zone;
806 800
807 _AsBroadcastStreamController<T> _controller; 801 _AsBroadcastStreamController<T> _controller;
808 StreamSubscription<T> _subscription; 802 StreamSubscription<T> _subscription;
809 803
810 _AsBroadcastStream(this._source, 804 _AsBroadcastStream(this._source,
811 void onListenHandler(StreamSubscription subscription), 805 void onListenHandler(StreamSubscription<T> subscription),
812 void onCancelHandler(StreamSubscription subscription)) 806 void onCancelHandler(StreamSubscription<T> subscription))
813 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), 807 // TODO(floitsch): the return type should be void and should be
814 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), 808 // inferred.
809 : _onListenHandler = Zone.current.registerUnaryCallback
810 /*<dynamic, StreamSubscription<T>>*/(onListenHandler),
811 _onCancelHandler = Zone.current.registerUnaryCallback
812 /*<dynamic, StreamSubscription<T>>*/(onCancelHandler),
815 _zone = Zone.current { 813 _zone = Zone.current {
816 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); 814 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
817 } 815 }
818 816
819 bool get isBroadcast => true; 817 bool get isBroadcast => true;
820 818
821 StreamSubscription<T> listen(void onData(T data), 819 StreamSubscription<T> listen(void onData(T data),
822 { Function onError, 820 { Function onError,
823 void onDone(), 821 void onDone(),
824 bool cancelOnError}) { 822 bool cancelOnError}) {
825 if (_controller == null || _controller.isClosed) { 823 if (_controller == null || _controller.isClosed) {
826 // Return a dummy subscription backed by nothing, since 824 // Return a dummy subscription backed by nothing, since
827 // it will only ever send one done event. 825 // it will only ever send one done event.
828 return new _DoneStreamSubscription<T>(onDone); 826 return new _DoneStreamSubscription<T>(onDone);
829 } 827 }
830 if (_subscription == null) { 828 if (_subscription == null) {
831 _subscription = _source.listen(_controller.add, 829 _subscription = _source.listen(_controller.add,
832 onError: _controller.addError, 830 onError: _controller.addError,
833 onDone: _controller.close); 831 onDone: _controller.close);
834 } 832 }
835 cancelOnError = identical(true, cancelOnError); 833 cancelOnError = identical(true, cancelOnError);
836 return _controller._subscribe(onData, onError, onDone, cancelOnError); 834 return _controller._subscribe(onData, onError, onDone, cancelOnError);
837 } 835 }
838 836
839 void _onCancel() { 837 void _onCancel() {
840 bool shutdown = (_controller == null) || _controller.isClosed; 838 bool shutdown = (_controller == null) || _controller.isClosed;
841 if (_onCancelHandler != null) { 839 if (_onCancelHandler != null) {
842 _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this)); 840 _zone.runUnary(
841 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
843 } 842 }
844 if (shutdown) { 843 if (shutdown) {
845 if (_subscription != null) { 844 if (_subscription != null) {
846 _subscription.cancel(); 845 _subscription.cancel();
847 _subscription = null; 846 _subscription = null;
848 } 847 }
849 } 848 }
850 } 849 }
851 850
852 void _onListen() { 851 void _onListen() {
853 if (_onListenHandler != null) { 852 if (_onListenHandler != null) {
854 _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this)); 853 _zone.runUnary(
854 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
855 } 855 }
856 } 856 }
857 857
858 // Methods called from _BroadcastSubscriptionWrapper. 858 // Methods called from _BroadcastSubscriptionWrapper.
859 void _cancelSubscription() { 859 void _cancelSubscription() {
860 if (_subscription == null) return; 860 if (_subscription == null) return;
861 // Called by [_controller] when it has no subscribers left. 861 // Called by [_controller] when it has no subscribers left.
862 StreamSubscription subscription = _subscription; 862 StreamSubscription subscription = _subscription;
863 _subscription = null; 863 _subscription = null;
864 _controller = null; // Marks the stream as no longer listenable. 864 _controller = null; // Marks the stream as no longer listenable.
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
914 914
915 Future cancel() { 915 Future cancel() {
916 _stream._cancelSubscription(); 916 _stream._cancelSubscription();
917 return null; 917 return null;
918 } 918 }
919 919
920 bool get isPaused { 920 bool get isPaused {
921 return _stream._isSubscriptionPaused; 921 return _stream._isSubscriptionPaused;
922 } 922 }
923 923
924 Future asFuture([var futureValue]) { 924 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
925 throw new UnsupportedError( 925 throw new UnsupportedError(
926 "Cannot change handlers of asBroadcastStream source subscription."); 926 "Cannot change handlers of asBroadcastStream source subscription.");
927 } 927 }
928 } 928 }
929 929
930 930
931 /** 931 /**
932 * Simple implementation of [StreamIterator]. 932 * Simple implementation of [StreamIterator].
933 */ 933 */
934 class _StreamIteratorImpl<T> implements StreamIterator<T> { 934 class _StreamIteratorImpl<T> implements StreamIterator<T> {
(...skipping 30 matching lines...) Expand all
965 /// The current element represented by the most recent call to moveNext. 965 /// The current element represented by the most recent call to moveNext.
966 /// 966 ///
967 /// Is null between the time moveNext is called and its future completes. 967 /// Is null between the time moveNext is called and its future completes.
968 T _current = null; 968 T _current = null;
969 969
970 /// The future returned by the most recent call to [moveNext]. 970 /// The future returned by the most recent call to [moveNext].
971 /// 971 ///
972 /// Also used to store the next value/error in case the stream provides an 972 /// Also used to store the next value/error in case the stream provides an
973 /// event before [moveNext] is called again. In that case, the stream will 973 /// event before [moveNext] is called again. In that case, the stream will
974 /// be paused to prevent further events. 974 /// be paused to prevent further events.
975 var _futureOrPrefetch = null; 975 var/*Future<bool> or T*/ _futureOrPrefetch = null;
976 976
977 /// The current state. 977 /// The current state.
978 int _state = _STATE_FOUND; 978 int _state = _STATE_FOUND;
979 979
980 _StreamIteratorImpl(final Stream<T> stream) { 980 _StreamIteratorImpl(final Stream<T> stream) {
981 _subscription = stream.listen(_onData, 981 _subscription = stream.listen(_onData,
982 onError: _onError, 982 onError: _onError,
983 onDone: _onDone, 983 onDone: _onDone,
984 cancelOnError: true); 984 cancelOnError: true);
985 } 985 }
986 986
987 T get current => _current; 987 T get current => _current;
988 988
989 Future<bool> moveNext() { 989 Future<bool> moveNext() {
990 if (_state == _STATE_DONE) { 990 if (_state == _STATE_DONE) {
991 return new _Future<bool>.immediate(false); 991 return new _Future<bool>.immediate(false);
992 } 992 }
993 if (_state == _STATE_MOVING) { 993 if (_state == _STATE_MOVING) {
994 throw new StateError("Already waiting for next."); 994 throw new StateError("Already waiting for next.");
995 } 995 }
996 if (_state == _STATE_FOUND) { 996 if (_state == _STATE_FOUND) {
997 _state = _STATE_MOVING; 997 _state = _STATE_MOVING;
998 _current = null; 998 _current = null;
999 _futureOrPrefetch = new _Future<bool>(); 999 var result = new _Future<bool>();
1000 return _futureOrPrefetch; 1000 _futureOrPrefetch = result;
1001 return result;
1001 } else { 1002 } else {
1002 assert(_state >= _STATE_EXTRA_DATA); 1003 assert(_state >= _STATE_EXTRA_DATA);
1003 switch (_state) { 1004 switch (_state) {
1004 case _STATE_EXTRA_DATA: 1005 case _STATE_EXTRA_DATA:
1005 _state = _STATE_FOUND; 1006 _state = _STATE_FOUND;
1006 _current = _futureOrPrefetch; 1007 _current = _futureOrPrefetch as Object /*=T*/;
1007 _futureOrPrefetch = null; 1008 _futureOrPrefetch = null;
1008 _subscription.resume(); 1009 _subscription.resume();
1009 return new _Future<bool>.immediate(true); 1010 return new _Future<bool>.immediate(true);
1010 case _STATE_EXTRA_ERROR: 1011 case _STATE_EXTRA_ERROR:
1011 AsyncError prefetch = _futureOrPrefetch; 1012 AsyncError prefetch = _futureOrPrefetch;
1012 _clear(); 1013 _clear();
1013 return new _Future<bool>.immediateError(prefetch.error, 1014 return new _Future<bool>.immediateError(prefetch.error,
1014 prefetch.stackTrace); 1015 prefetch.stackTrace);
1015 case _STATE_EXTRA_DONE: 1016 case _STATE_EXTRA_DONE:
1016 _clear(); 1017 _clear();
1017 return new _Future<bool>.immediate(false); 1018 return new _Future<bool>.immediate(false);
1018 } 1019 }
1019 } 1020 }
1020 } 1021 }
1021 1022
1022 /** Clears up the internal state when the iterator ends. */ 1023 /** Clears up the internal state when the iterator ends. */
1023 void _clear() { 1024 void _clear() {
1024 _subscription = null; 1025 _subscription = null;
1025 _futureOrPrefetch = null; 1026 _futureOrPrefetch = null;
1026 _current = null; 1027 _current = null;
1027 _state = _STATE_DONE; 1028 _state = _STATE_DONE;
1028 } 1029 }
1029 1030
1030 Future cancel() { 1031 Future cancel() {
1031 StreamSubscription subscription = _subscription; 1032 StreamSubscription subscription = _subscription;
1032 // Cherry pick of: https://codereview.chromium.org//896793002
1033 if (subscription == null) return null; 1033 if (subscription == null) return null;
1034 if (_state == _STATE_MOVING) { 1034 if (_state == _STATE_MOVING) {
1035 _Future<bool> hasNext = _futureOrPrefetch; 1035 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1036 _clear(); 1036 _clear();
1037 hasNext._complete(false); 1037 hasNext._complete(false);
1038 } else { 1038 } else {
1039 _clear(); 1039 _clear();
1040 } 1040 }
1041 return subscription.cancel(); 1041 return subscription.cancel();
1042 } 1042 }
1043 1043
1044 void _onData(T data) { 1044 void _onData(T data) {
1045 if (_state == _STATE_MOVING) { 1045 if (_state == _STATE_MOVING) {
1046 _current = data; 1046 _current = data;
1047 _Future<bool> hasNext = _futureOrPrefetch; 1047 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1048 _futureOrPrefetch = null; 1048 _futureOrPrefetch = null;
1049 _state = _STATE_FOUND; 1049 _state = _STATE_FOUND;
1050 hasNext._complete(true); 1050 hasNext._complete(true);
1051 return; 1051 return;
1052 } 1052 }
1053 _subscription.pause(); 1053 _subscription.pause();
1054 assert(_futureOrPrefetch == null); 1054 assert(_futureOrPrefetch == null);
1055 _futureOrPrefetch = data; 1055 _futureOrPrefetch = data;
1056 _state = _STATE_EXTRA_DATA; 1056 _state = _STATE_EXTRA_DATA;
1057 } 1057 }
1058 1058
1059 void _onError(Object error, [StackTrace stackTrace]) { 1059 void _onError(Object error, [StackTrace stackTrace]) {
1060 if (_state == _STATE_MOVING) { 1060 if (_state == _STATE_MOVING) {
1061 _Future<bool> hasNext = _futureOrPrefetch; 1061 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1062 // We have cancelOnError: true, so the subscription is canceled. 1062 // We have cancelOnError: true, so the subscription is canceled.
1063 _clear(); 1063 _clear();
1064 hasNext._completeError(error, stackTrace); 1064 hasNext._completeError(error, stackTrace);
1065 return; 1065 return;
1066 } 1066 }
1067 _subscription.pause(); 1067 _subscription.pause();
1068 assert(_futureOrPrefetch == null); 1068 assert(_futureOrPrefetch == null);
1069 _futureOrPrefetch = new AsyncError(error, stackTrace); 1069 _futureOrPrefetch = new AsyncError(error, stackTrace);
1070 _state = _STATE_EXTRA_ERROR; 1070 _state = _STATE_EXTRA_ERROR;
1071 } 1071 }
1072 1072
1073 void _onDone() { 1073 void _onDone() {
1074 if (_state == _STATE_MOVING) { 1074 if (_state == _STATE_MOVING) {
1075 _Future<bool> hasNext = _futureOrPrefetch; 1075 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1076 _clear(); 1076 _clear();
1077 hasNext._complete(false); 1077 hasNext._complete(false);
1078 return; 1078 return;
1079 } 1079 }
1080 _subscription.pause(); 1080 _subscription.pause();
1081 _futureOrPrefetch = null; 1081 _futureOrPrefetch = null;
1082 _state = _STATE_EXTRA_DONE; 1082 _state = _STATE_EXTRA_DONE;
1083 } 1083 }
1084 } 1084 }
1085
1086 /** An empty broadcast stream, sending a done event as soon as possible. */
1087 class _EmptyStream<T> extends Stream<T> {
1088 const _EmptyStream() : super._internal();
1089 bool get isBroadcast => true;
1090 StreamSubscription<T> listen(void onData(T data),
1091 {Function onError,
1092 void onDone(),
1093 bool cancelOnError}) {
1094 return new _DoneStreamSubscription<T>(onDone);
1095 }
1096 }
OLDNEW
« no previous file with comments | « tool/input_sdk/lib/async/stream_controller.dart ('k') | tool/input_sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698