Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.subscription_stream; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 | |
| 9 import "delegating_stream_subscription.dart"; | |
| 10 | |
| 11 /// A [Stream] adapter for a [StreamSubscription]. | |
| 12 /// | |
| 13 /// This class allows as `StreamSubscription` to be treated as a `Stream`. | |
| 14 /// | |
| 15 /// The subscription is paused until the stream is listened to, | |
| 16 /// then it is resumed and the events are passed on to the | |
| 17 /// stream's new subscription. | |
| 18 /// | |
| 19 /// This class assumes that is has control over the original subscription. | |
| 20 /// If other code is accessing the subscription, results may be unpredictable. | |
| 21 class SubscriptionStream<T> extends Stream<T> { | |
| 22 /// The subscription providing the events for this stream. | |
| 23 StreamSubscription _source; | |
| 24 | |
| 25 /// Whether [_source] was created with `cancelOnError` set to `true.` | |
| 26 final bool _sourceCancelsOnError; | |
| 27 | |
| 28 /// Create a single-subscription `Stream` from [subscription]. | |
| 29 /// | |
| 30 /// The `subscription` should not be paused. This class will not resume prior | |
| 31 /// pauses, so being paused is indistinguishable from not providing any | |
| 32 /// events. | |
| 33 /// | |
| 34 /// If the original `cancelOnError` value used when creating `subscription` | |
| 35 /// is known, it can be passed as the [isCancelOnError] parameter. | |
| 36 /// | |
| 37 /// The [iscancelOnError] argument doesn't need to match the original | |
| 38 /// subscription's `cancelOnError` value; | |
| 39 /// however, if [isCancelOnError] is `false` | |
| 40 /// and the original subscription's `cancelOnError` value is `true`, | |
| 41 /// this stream will never emit a done event after an error event. | |
| 42 /// On the other hand, if [isCancelOnError] is true, this stream | |
| 43 /// will emit a done event and stop after the first error | |
| 44 /// regardless of both the original subscription's value | |
| 45 /// and the `cancelOnError` value used when listening to this stream. | |
| 46 SubscriptionStream(StreamSubscription subscription, | |
| 47 {bool isCancelOnError: false}) | |
|
nweiz
2015/06/18 23:44:27
I thought you were getting rid of this?
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Ack, I was. Got lost in the other changes, I guess
| |
| 48 : _source = subscription, | |
| 49 _sourceCancelsOnError = isCancelOnError { | |
| 50 _source.pause(); | |
| 51 // Clear callbacks to avoid keeping them alive unnecessarily. | |
| 52 _source.onData(null); | |
| 53 _source.onError(null); | |
| 54 _source.onDone(null); | |
| 55 } | |
| 56 | |
| 57 StreamSubscription<T> listen(void onData(T event), | |
| 58 {Function onError, | |
| 59 void onDone(), | |
| 60 bool cancelOnError}) { | |
| 61 if (_source == null) { | |
| 62 throw new StateError("Stream has already been listened to."); | |
| 63 } | |
| 64 cancelOnError = (true == cancelOnError); | |
| 65 var subscription = _source; | |
| 66 _source = null; | |
| 67 var result; | |
| 68 if (cancelOnError == _sourceCancelsOnError) { | |
| 69 // Type parameters may not match - cast by wrapping. | |
| 70 result = new DelegatingStreamSubscription<T>(subscription); | |
| 71 } else if (cancelOnError) { | |
| 72 assert(!_sourceCancelsOnError); | |
| 73 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); | |
| 74 } else { | |
| 75 assert(!cancelOnError && _sourceCancelsOnError); | |
| 76 result = new _DoneAfterErrorSubscriptionWrapper(subscription); | |
| 77 } | |
| 78 result.onData(onData); | |
| 79 result.onError(onError); | |
| 80 result.onDone(onDone); | |
| 81 subscription.resume(); | |
| 82 return result; | |
| 83 } | |
| 84 } | |
| 85 | |
| 86 /// Subscription wrapper that cancels on error. | |
| 87 /// | |
| 88 /// Used by [SubscriptionStream] when forwarding a subscription | |
| 89 /// created with `cancelOnError` as `true` to one with (assumed) | |
| 90 /// `cancelOnError` as `false`. It automatically cancels the | |
| 91 /// source subscription on the first error. | |
| 92 class _CancelOnErrorSubscriptionWrapper<T> | |
| 93 extends DelegatingStreamSubscription<T> { | |
| 94 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) | |
| 95 : super(subscription); | |
| 96 | |
| 97 void onError(Function handleError) { | |
| 98 // Cancel when receiving an error. | |
| 99 super.onError((error, StackTrace stackTrace) { | |
| 100 super.cancel(); | |
| 101 if (handleError is ZoneBinaryCallback) { | |
| 102 handleError(error, stackTrace); | |
| 103 } else { | |
| 104 handleError(error); | |
| 105 } | |
| 106 }); | |
| 107 } | |
| 108 } | |
| 109 | |
| 110 /// Subscription wrapper that sends a done event after an error. | |
| 111 /// | |
| 112 /// If the source subscription was created with `cancelOnError` as true, | |
| 113 /// this subscription will look like a non-`cancelOnError` subscription | |
| 114 /// that happens to end normally after the first error. | |
| 115 /// | |
| 116 /// If the source subscription isn't `cancelOnError` then it's canceled | |
| 117 /// after the first error anyway, to ensure consistent behavior. | |
| 118 class _DoneAfterErrorSubscriptionWrapper<T> | |
| 119 extends DelegatingStreamSubscription<T> { | |
| 120 // Stores the done handler so it can be called after an error. | |
| 121 var _onDone; | |
| 122 | |
| 123 _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription) | |
| 124 : super(subscription); | |
| 125 | |
| 126 void onDone(void handleDone()) { | |
| 127 super.onDone(handleDone); | |
| 128 _onDone = handleDone; | |
| 129 } | |
| 130 | |
| 131 void onError(Function errorHandler) { | |
| 132 errorHandler = _doneAfterError(errorHandler); | |
| 133 super.onError(errorHandler); | |
| 134 } | |
| 135 | |
| 136 Function _doneAfterError(Function errorHandler) { | |
| 137 return (error, StackTrace stackTrace) { | |
| 138 // Ensure that the subscription is really canceled | |
| 139 // so we don't get two done events ever - even if the | |
| 140 // class is used incorrectly. | |
| 141 super.cancel(); | |
| 142 scheduleMicrotask(() { | |
| 143 if (_onDone != null) { | |
| 144 _onDone(); | |
| 145 } | |
| 146 }); | |
| 147 if (errorHandler is ZoneBinaryCallback) { | |
| 148 errorHandler(error, stackTrace); | |
| 149 } else { | |
| 150 errorHandler(error); | |
| 151 } | |
| 152 }; | |
| 153 } | |
| 154 } | |
| OLD | NEW |