Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(127)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 11886013: Make Stream transformation respect the single/multi subscriber nature of the source. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Added missing isSingleSubscription impl. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // States shared by single/multi stream implementations. 7 // States shared by single/multi stream implementations.
8 8
9 /// Initial and default state where the stream can receive and send events. 9 /// Initial and default state where the stream can receive and send events.
10 const int _STREAM_OPEN = 0; 10 const int _STREAM_OPEN = 0;
(...skipping 403 matching lines...) Expand 10 before | Expand all | Expand 10 after
414 * * [_hasSubscribers]: Test whether there are currently any subscribers. 414 * * [_hasSubscribers]: Test whether there are currently any subscribers.
415 * * [_isPaused]: Test whether the stream is currently paused. 415 * * [_isPaused]: Test whether the stream is currently paused.
416 * The user should not add new events while the stream is paused, but if it 416 * The user should not add new events while the stream is paused, but if it
417 * happens anyway, the stream will enqueue the events just as when new events 417 * happens anyway, the stream will enqueue the events just as when new events
418 * arrive while still firing an old event. 418 * arrive while still firing an old event.
419 */ 419 */
420 class _SingleStreamImpl<T> extends _StreamImpl<T> { 420 class _SingleStreamImpl<T> extends _StreamImpl<T> {
421 _StreamListener _subscriber = null; 421 _StreamListener _subscriber = null;
422 422
423 Stream<T> asMultiSubscriberStream() { 423 Stream<T> asMultiSubscriberStream() {
424 return new _ForwardingMultiStream<T, T>().._source = this; 424 return new _SingleStreamMultiplexer<T>(this);
425 } 425 }
426 426
427 bool get isSingleSubscription => true;
428
427 /** Whether one or more active subscribers have requested a pause. */ 429 /** Whether one or more active subscribers have requested a pause. */
428 bool get _isPaused => !_hasSubscribers || super._isPaused; 430 bool get _isPaused => !_hasSubscribers || super._isPaused;
429 431
430 /** Whether there is currently a subscriber on this [Stream]. */ 432 /** Whether there is currently a subscriber on this [Stream]. */
431 bool get _hasSubscribers => _subscriber != null; 433 bool get _hasSubscribers => _subscriber != null;
432 434
433 // ------------------------------------------------------------------- 435 // -------------------------------------------------------------------
434 // Internal implementation. 436 // Internal implementation.
435 437
436 /** 438 /**
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
534 536
535 _MultiStreamImpl() { 537 _MultiStreamImpl() {
536 _nextLink = _previousLink = this; 538 _nextLink = _previousLink = this;
537 } 539 }
538 540
539 Stream<T> asMultiSubscriberStream() => this; 541 Stream<T> asMultiSubscriberStream() => this;
540 542
541 // ------------------------------------------------------------------ 543 // ------------------------------------------------------------------
542 // Helper functions that can be overridden in subclasses. 544 // Helper functions that can be overridden in subclasses.
543 545
546 bool get isSingleSubscription => false;
floitsch 2013/01/14 16:09:09 I don't think anybody wants to override this metho
Lasse Reichstein Nielsen 2013/01/15 07:19:51 Done.
547
544 /** Whether there are currently any subscribers on this [Stream]. */ 548 /** Whether there are currently any subscribers on this [Stream]. */
545 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); 549 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this);
546 550
547 /** 551 /**
548 * Create the new subscription object. 552 * Create the new subscription object.
549 */ 553 */
550 _StreamListener<T> _createSubscription( 554 _StreamListener<T> _createSubscription(
551 void onData(T data), 555 void onData(T data),
552 void onError(AsyncError error), 556 void onError(AsyncError error),
553 void onDone(), 557 void onDone(),
(...skipping 532 matching lines...) Expand 10 before | Expand all | Expand 10 after
1086 if (_isComplete) { 1090 if (_isComplete) {
1087 throw new StateError("Subscription has been canceled."); 1091 throw new StateError("Subscription has been canceled.");
1088 } 1092 }
1089 if (_timer != null) { 1093 if (_timer != null) {
1090 _timer.cancel(); 1094 _timer.cancel();
1091 _timer = null; 1095 _timer = null;
1092 } 1096 }
1093 _pauseCount = 0; 1097 _pauseCount = 0;
1094 } 1098 }
1095 } 1099 }
1100
1101 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> {
1102 final _SingleStreamImpl<T> _source;
1103 StreamSubscription<T> _subscription;
1104
1105 _SingleStreamMultiplexer(this._source);
1106
1107 void _onPauseStateChange() {
1108 if (_isPaused) {
1109 if (_subscription != null) {
1110 _subscription.pause();
1111 }
1112 } else {
1113 if (_subscription != null) {
1114 _subscription.resume();
1115 }
1116 }
1117 }
1118
1119 /**
1120 * Subscribe or unsubscribe on [_source] depending on whether
1121 * [_stream] has subscribers.
1122 */
1123 void _onSubscriptionStateChange() {
1124 if (_hasSubscribers) {
1125 assert(_subscription == null);
1126 _subscription = _source.listen(this._add,
1127 onError: this._signalError,
1128 onDone: this._close);
1129 } else {
1130 // TODO(lrn): Check why this can happen.
1131 if (_subscription == null) return;
1132 _subscription.cancel();
1133 _subscription = null;
1134 }
1135 }
1136 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698