Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(46)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 11740027: Rename unsubscribe to cancel. (Closed) Base URL: https://dart.googlecode.com/svn/experimental/lib_v2/dart
Patch Set: Fix error message. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // part of dart.async; 5 // part of dart.async;
6 6
7 // States shared by single/multi stream implementations. 7 // States shared by single/multi stream implementations.
8 8
9 /// Initial and default state where the stream can receive and send events. 9 /// Initial and default state where the stream can receive and send events.
10 const int _STREAM_OPEN = 0; 10 const int _STREAM_OPEN = 0;
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 * If events are added to the stream (using [_add], [_signalError] or [_done]) 55 * If events are added to the stream (using [_add], [_signalError] or [_done])
56 * while the stream is paused, or while another event is firing, events will 56 * while the stream is paused, or while another event is firing, events will
57 * stored here. 57 * stored here.
58 * Also supports scheduling the events for later execution. 58 * Also supports scheduling the events for later execution.
59 */ 59 */
60 _StreamImplEvents _pendingEvents; 60 _StreamImplEvents _pendingEvents;
61 61
62 // ------------------------------------------------------------------ 62 // ------------------------------------------------------------------
63 // Stream interface. 63 // Stream interface.
64 64
65 StreamSubscription subscribe({void onData(T data), 65 StreamSubscription listen(void onData(T data),
66 void onError(AsyncError error), 66 { void onError(AsyncError error),
67 void onDone(), 67 void onDone(),
68 bool unsubscribeOnError}) { 68 bool unsubscribeOnError}) {
69 if (_isComplete) { 69 if (_isComplete) {
70 return new _DoneSubscription(onDone); 70 return new _DoneSubscription(onDone);
71 } 71 }
72 if (onData == null) onData = _nullDataHandler; 72 if (onData == null) onData = _nullDataHandler;
73 if (onError == null) onError = _nullErrorHandler; 73 if (onError == null) onError = _nullErrorHandler;
74 if (onDone == null) onDone = _nullDoneHandler; 74 if (onDone == null) onDone = _nullDoneHandler;
75 unsubscribeOnError = identical(true, unsubscribeOnError); 75 unsubscribeOnError = identical(true, unsubscribeOnError);
76 _StreamListener subscription = 76 _StreamListener subscription =
77 _createSubscription(onData, onError, onDone, unsubscribeOnError); 77 _createSubscription(onData, onError, onDone, unsubscribeOnError);
78 _addListener(subscription); 78 _addListener(subscription);
(...skipping 180 matching lines...) Expand 10 before | Expand all | Expand 10 after
259 bool unsubscribeOnError); 259 bool unsubscribeOnError);
260 260
261 /** 261 /**
262 * Adds a listener to this stream. 262 * Adds a listener to this stream.
263 */ 263 */
264 void _addListener(_StreamSubscriptionImpl subscription); 264 void _addListener(_StreamSubscriptionImpl subscription);
265 265
266 /** 266 /**
267 * Handle an unsubscription requested from a [_StreamSubscriptionImpl]. 267 * Handle an unsubscription requested from a [_StreamSubscriptionImpl].
268 * 268 *
269 * This method is called from [_StreamSubscriptionImpl.unsubscribe]. 269 * This method is called from [_StreamSubscriptionImpl.cancel].
270 * 270 *
271 * If an event is currently firing, the subscription is delayed 271 * If an event is currently firing, the subscription is delayed
272 * until after the event has been sent to all subscribers. 272 * until after the event has been sent to all subscribers.
273 */ 273 */
274 void _unsubscribe(_StreamSubscriptionImpl subscriber); 274 void _cancel(_StreamSubscriptionImpl subscriber);
275 275
276 /** 276 /**
277 * Iterate over all current subscribers and perform an action on each. 277 * Iterate over all current subscribers and perform an action on each.
278 * 278 *
279 * Subscribers added during the iteration will not be visited. 279 * Subscribers added during the iteration will not be visited.
280 * Subscribers unsubscribed during the iteration will only be removed 280 * Subscribers unsubscribed during the iteration will only be removed
281 * after they have been acted on. 281 * after they have been acted on.
282 * 282 *
283 * Any change in the pause state is only reported after all subscribers have 283 * Any change in the pause state is only reported after all subscribers have
284 * received the event. 284 * received the event.
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
353 * Sends the "done" message directly to each subscriber. 353 * Sends the "done" message directly to each subscriber.
354 * This automatically stops further subscription and 354 * This automatically stops further subscription and
355 * unsubscribes all subscribers. 355 * unsubscribes all subscribers.
356 */ 356 */
357 void _sendDone() { 357 void _sendDone() {
358 assert(!_isPaused); 358 assert(!_isPaused);
359 assert(_isClosed); 359 assert(_isClosed);
360 _setComplete(); 360 _setComplete();
361 if (!_hasSubscribers) return; 361 if (!_hasSubscribers) return;
362 _forEachSubscriber((subscriber) { 362 _forEachSubscriber((subscriber) {
363 _unsubscribe(subscriber); 363 _cancel(subscriber);
364 try { 364 try {
365 subscriber._sendDone(); 365 subscriber._sendDone();
366 } catch (e, s) { 366 } catch (e, s) {
367 new AsyncError(e, s).throwDelayed(); 367 new AsyncError(e, s).throwDelayed();
368 } 368 }
369 }); 369 });
370 assert(!_hasSubscribers); 370 assert(!_hasSubscribers);
371 _onSubscriptionStateChange(); 371 _onSubscriptionStateChange();
372 } 372 }
373 } 373 }
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
431 throw new StateError("Stream has already subscriber."); 431 throw new StateError("Stream has already subscriber.");
432 } 432 }
433 _subscriber = subscription; 433 _subscriber = subscription;
434 subscription._setSubscribed(0); 434 subscription._setSubscribed(0);
435 _onSubscriptionStateChange(); 435 _onSubscriptionStateChange();
436 // TODO(floitsch): Should this be delayed? 436 // TODO(floitsch): Should this be delayed?
437 _handlePendingEvents(); 437 _handlePendingEvents();
438 } 438 }
439 439
440 /** 440 /**
441 * Handle an unsubscription requested from a [_StreamSubscriptionImpl]. 441 * Handle an unsubscription requested from a [_StreamSubscriptionImpl].
Lasse Reichstein Nielsen 2013/01/04 08:17:55 a cancel request.
floitsch 2013/01/04 15:51:36 Done.
442 * 442 *
443 * This method is called from [_StreamSubscriptionImpl.unsubscribe]. 443 * This method is called from [_StreamSubscriptionImpl.cancel].
444 * 444 *
445 * If an event is currently firing, the subscription is delayed 445 * If an event is currently firing, the subscription is delayed
Lasse Reichstein Nielsen 2013/01/04 08:17:55 subscription (sic) -> cancel. Perhaps also -> "...
floitsch 2013/01/04 15:51:36 Done.
446 * until after the event has been sent to all subscribers. 446 * until after the event has been sent to all subscribers.
447 */ 447 */
448 void _unsubscribe(_StreamSubscriptionImpl subscriber) { 448 void _cancel(_StreamSubscriptionImpl subscriber) {
449 assert(identical(subscriber._source, this)); 449 assert(identical(subscriber._source, this));
450 // We allow unsubscribing the currently firing subscription during 450 // We allow unsubscribing the currently firing subscription during
451 // the event firing, because it is indistinguishable from delaying it since 451 // the event firing, because it is indistinguishable from delaying it since
452 // that event has already received the event. 452 // that event has already received the event.
453 if (!identical(_subscriber, subscriber)) { 453 if (!identical(_subscriber, subscriber)) {
454 // You may unsubscribe more than once, only the first one counts. 454 // You may unsubscribe more than once, only the first one counts.
455 return; 455 return;
456 } 456 }
457 _subscriber = null; 457 _subscriber = null;
458 int timesPaused = subscriber._setUnsubscribed(); 458 int timesPaused = subscriber._setUnsubscribed();
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after
575 bool firstSubscriber = !_hasSubscribers; 575 bool firstSubscriber = !_hasSubscribers;
576 _InternalLinkList.add(this, listener); 576 _InternalLinkList.add(this, listener);
577 if (firstSubscriber) { 577 if (firstSubscriber) {
578 _onSubscriptionStateChange(); 578 _onSubscriptionStateChange();
579 } 579 }
580 } 580 }
581 581
582 /** 582 /**
583 * Handle an unsubscription requested from a [_StreamListener]. 583 * Handle an unsubscription requested from a [_StreamListener].
584 * 584 *
585 * This method is called from [_StreamListener.unsubscribe]. 585 * This method is called from [_StreamListener.cancel].
586 * 586 *
587 * If an event is currently firing, the unsubscription is delayed 587 * If an event is currently firing, the unsubscription is delayed
588 * until after the event has been sent to all subscribers. 588 * until after the event has been sent to all subscribers.
589 */ 589 */
590 void _unsubscribe(_StreamListener listener) { 590 void _cancel(_StreamListener listener) {
591 assert(identical(listener._source, this)); 591 assert(identical(listener._source, this));
592 if (_InternalLink.isUnlinked(listener)) { 592 if (_InternalLink.isUnlinked(listener)) {
593 // You may unsubscribe more than once, only the first one counts. 593 // You may unsubscribe more than once, only the first one counts.
594 return; 594 return;
595 } 595 }
596 if (_isFiring) { 596 if (_isFiring) {
597 if (listener._needsEvent(_currentEventIdBit)) { 597 if (listener._needsEvent(_currentEventIdBit)) {
598 assert(listener._isSubscribed); 598 assert(listener._isSubscribed);
599 listener._setPendingUnsubscribe(); 599 listener._setPendingUnsubscribe();
600 } else { 600 } else {
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
666 if (handleDone == null) handleDone = _nullDoneHandler; 666 if (handleDone == null) handleDone = _nullDoneHandler;
667 _onDone = handleDone; 667 _onDone = handleDone;
668 } 668 }
669 669
670 void _sendData(T data) { 670 void _sendData(T data) {
671 _onData(data); 671 _onData(data);
672 } 672 }
673 673
674 void _sendError(AsyncError error) { 674 void _sendError(AsyncError error) {
675 _onError(error); 675 _onError(error);
676 if (_unsubscribeOnError) _source._unsubscribe(this); 676 if (_unsubscribeOnError) _source._cancel(this);
677 } 677 }
678 678
679 void _sendDone() { 679 void _sendDone() {
680 _onDone(); 680 _onDone();
681 } 681 }
682 682
683 void unsubscribe() { 683 void cancel() {
684 _source._unsubscribe(this); 684 _source._cancel(this);
685 } 685 }
686 686
687 void pause([Signal resumeSignal]) { 687 void pause([Signal resumeSignal]) {
688 _source._pause(this, resumeSignal); 688 _source._pause(this, resumeSignal);
689 } 689 }
690 690
691 void resume() { 691 void resume() {
692 if (!isPaused) { 692 if (!isPaused) {
693 throw new StateError("Resuming unpaused subscription"); 693 throw new StateError("Resuming unpaused subscription");
694 } 694 }
(...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after
978 bool get _isComplete => _timer == null && _pauseCount == 0; 978 bool get _isComplete => _timer == null && _pauseCount == 0;
979 979
980 void onData(void handleAction(T value)) {} 980 void onData(void handleAction(T value)) {}
981 void onError(void handleError(StateError error)) {} 981 void onError(void handleError(StateError error)) {}
982 void onDone(void handleDone(T value)) { 982 void onDone(void handleDone(T value)) {
983 _handler = handleDone; 983 _handler = handleDone;
984 } 984 }
985 985
986 void pause([Signal signal]) { 986 void pause([Signal signal]) {
987 if (_isComplete) { 987 if (_isComplete) {
988 throw new StateError("Subscription has been unsubscribed."); 988 throw new StateError("Subscription has been canceled.");
989 } 989 }
990 if (_timer != null) _timer.cancel(); 990 if (_timer != null) _timer.cancel();
991 _pauseCount++; 991 _pauseCount++;
992 } 992 }
993 993
994 void resume() { 994 void resume() {
995 if (_isComplete) { 995 if (_isComplete) {
996 throw new StateError("Subscription has been unsubscribed."); 996 throw new StateError("Subscription has been canceled.");
997 } 997 }
998 if (_pauseCount == 0) return; 998 if (_pauseCount == 0) return;
999 _pauseCount--; 999 _pauseCount--;
1000 if (_pauseCount == 0) { 1000 if (_pauseCount == 0) {
1001 _delayDone(); 1001 _delayDone();
1002 } 1002 }
1003 } 1003 }
1004 1004
1005 bool get isPaused => _pauseCount > 0; 1005 bool get isPaused => _pauseCount > 0;
1006 1006
1007 void unsubscribe() { 1007 void cancel() {
1008 if (_isComplete) { 1008 if (_isComplete) {
1009 throw new StateError("Subscription has been unsubscribed."); 1009 throw new StateError("Subscription has been canceled.");
1010 } 1010 }
1011 if (_timer != null) { 1011 if (_timer != null) {
1012 _timer.cancel(); 1012 _timer.cancel();
1013 _timer = null; 1013 _timer = null;
1014 } 1014 }
1015 _pauseCount = 0; 1015 _pauseCount = 0;
1016 } 1016 }
1017 } 1017 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698