OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.streams.stream_subscription; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 import "delegates.dart"; |
| 10 |
| 11 /// A [Stream] adapter for a [StreamSubscription]. |
| 12 /// |
| 13 /// This class allows as `StreamSubscription` to be treated as a `Stream`. |
| 14 /// |
| 15 /// The subscription is paused until the stream is listened to, |
| 16 /// then it is resumed and the events are passed on to the |
| 17 /// stream's new subscription. |
| 18 /// |
| 19 /// This class assumes that is has control over the original subscription. |
| 20 /// If other code is accessing the subscription, results may be unpredictable. |
| 21 class SubscriptionStream<T> extends Stream<T> { |
| 22 StreamSubscription _source; |
| 23 final bool _isCancelOnError; |
| 24 bool get isBroadcastStream => false; |
| 25 |
| 26 /// Create a `Stream` from [subscription]. |
| 27 /// |
| 28 /// The subscription should not be paused. This class will not resume prior |
| 29 /// pauses, so being paused is indistinguishable from not providing any |
| 30 /// events. |
| 31 /// |
| 32 /// If the original `isCancelOnError` value for the subscription is known, |
| 33 /// it can be passed as the [isCancelOnError] parameter. |
| 34 /// |
| 35 /// If the original subscription cancels on an error, this stream's |
| 36 /// subscription will also do so, whether it's requested in the [listen] call |
| 37 /// or not. |
| 38 /// |
| 39 /// If the `SubscriptionStream` knows this the original subscription's |
| 40 /// error behavior, it can adapt the new stream's subscription. |
| 41 /// |
| 42 /// If the original cancels on an error and the new subscription shouldn't, |
| 43 /// an extra done event is added after the first error, when the |
| 44 /// original subscription is being cancelled. |
| 45 /// |
| 46 /// If the original doesn't cancel on an error and the new subscripton does, |
| 47 /// a cancel is added on the first error. |
| 48 /// |
| 49 /// The default is to assume the subscription will not stop after an error. |
| 50 /// If that assumption is wrong, the result is a stream that stops providing |
| 51 /// events after the first error, including done events. |
| 52 SubscriptionStream(StreamSubscription subscription, |
| 53 {bool isCancelOnError: false}) |
| 54 : _source = subscription, |
| 55 _isCancelOnError = isCancelOnError { |
| 56 _source.pause(); |
| 57 // Clear callbacks to avoid keeping them alive unnecessarily. |
| 58 _source.onData(null); |
| 59 _source.onError(null); |
| 60 _source.onDone(null); |
| 61 } |
| 62 |
| 63 StreamSubscription<T> listen(void onData(T event), |
| 64 {Function onError, |
| 65 void onDone(), |
| 66 bool cancelOnError}) { |
| 67 if (_source == null) { |
| 68 throw new StateError("Stream has already been listened to."); |
| 69 } |
| 70 cancelOnError = (true == cancelOnError); |
| 71 var subscription = _source; |
| 72 _source = null; |
| 73 var result; |
| 74 if (cancelOnError == _isCancelOnError) { |
| 75 if (subscription is StreamSubscription<T>) { |
| 76 result = subscription; |
| 77 } else { |
| 78 // Type parameters don't match - cast by wrapping. |
| 79 result = new DelegatingStreamSubscription<T>(subscription); |
| 80 } |
| 81 } else if (cancelOnError) { |
| 82 assert(!_isCancelOnError); |
| 83 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); |
| 84 } else { |
| 85 assert(!cancelOnError && _isCancelOnError); |
| 86 result = new _DoneAfterErrorSubscriptionWrapper(subscription); |
| 87 } |
| 88 result.onData(onData); |
| 89 result.onError(onError); |
| 90 result.onDone(onDone); |
| 91 subscription.resume(); |
| 92 return result; |
| 93 } |
| 94 } |
| 95 |
| 96 class _CancelOnErrorSubscriptionWrapper<T> |
| 97 extends DelegatingStreamSubscription<T> { |
| 98 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) |
| 99 : super(subscription); |
| 100 |
| 101 void onError(Function handleError) { |
| 102 handleError = _cancelBeforeError(handleError); |
| 103 super.onError(handleError); |
| 104 } |
| 105 |
| 106 /// Helper function used by [onError]. |
| 107 /// |
| 108 /// Returns an error handler which cancels the stream when it receives an |
| 109 /// error. |
| 110 Function _cancelBeforeError(Function handleError) { |
| 111 return (e, s) { |
| 112 super.cancel(); |
| 113 if (handleError is _BinaryCallback) { |
| 114 handleError(e, s); |
| 115 } else { |
| 116 handleError(e); |
| 117 } |
| 118 }; |
| 119 } |
| 120 } |
| 121 |
| 122 /// Subscription wrapper that assumes wrapped subscription is cancel-on-error. |
| 123 /// |
| 124 /// This adapts a cancel-on-error subscription as non-cancel-on-error, |
| 125 /// by introducing a done event after the error event that terminated |
| 126 /// the wrapped subscription. |
| 127 class _DoneAfterErrorSubscriptionWrapper<T> |
| 128 extends DelegatingStreamSubscription<T> { |
| 129 // Store the done handler so it can be called after an error. |
| 130 |
| 131 var _onDone; |
| 132 _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription) |
| 133 : super(subscription); |
| 134 |
| 135 void onDone(void handleDone()) { |
| 136 super.onDone(handleDone); |
| 137 _onDone = handleDone; |
| 138 } |
| 139 |
| 140 void onError(Function errorHandler) { |
| 141 errorHandler = _doneAfterError(errorHandler); |
| 142 super.onError(errorHandler); |
| 143 } |
| 144 |
| 145 Function _doneAfterError(Function errorHandler) { |
| 146 return (e, s) { |
| 147 // Ensure that the subscription is really cancelled |
| 148 // so we don't get two done events ever - even if the |
| 149 // class is used incorrectly. |
| 150 super.cancel(); |
| 151 scheduleMicrotask(() { |
| 152 if (_onDone != null) { |
| 153 _onDone(); |
| 154 } |
| 155 }); |
| 156 if (errorHandler is _BinaryCallback) { |
| 157 errorHandler(e, s); |
| 158 } else { |
| 159 errorHandler(e); |
| 160 } |
| 161 }; |
| 162 } |
| 163 } |
| 164 |
| 165 typedef _BinaryCallback(e, s); |
OLD | NEW |