OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.subscription_stream; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 import "delegate/stream_subscription.dart"; |
| 10 |
| 11 /// A [Stream] adapter for a [StreamSubscription]. |
| 12 /// |
| 13 /// This class allows as `StreamSubscription` to be treated as a `Stream`. |
| 14 /// |
| 15 /// The subscription is paused until the stream is listened to, |
| 16 /// then it is resumed and the events are passed on to the |
| 17 /// stream's new subscription. |
| 18 /// |
| 19 /// This class assumes that is has control over the original subscription. |
| 20 /// If other code is accessing the subscription, results may be unpredictable. |
| 21 class SubscriptionStream<T> extends Stream<T> { |
| 22 /// The subscription providing the events for this stream. |
| 23 StreamSubscription _source; |
| 24 |
| 25 /// Create a single-subscription `Stream` from [subscription]. |
| 26 /// |
| 27 /// The `subscription` should not be paused. This class will not resume prior |
| 28 /// pauses, so being paused is indistinguishable from not providing any |
| 29 /// events. |
| 30 /// |
| 31 /// If the `subscription` doesn't send any `done` events, neither will this |
| 32 /// stream. That may be an issue if `subscription` was made to cancel on |
| 33 /// an error. |
| 34 SubscriptionStream(StreamSubscription subscription) |
| 35 : _source = subscription { |
| 36 _source.pause(); |
| 37 // Clear callbacks to avoid keeping them alive unnecessarily. |
| 38 _source.onData(null); |
| 39 _source.onError(null); |
| 40 _source.onDone(null); |
| 41 } |
| 42 |
| 43 StreamSubscription<T> listen(void onData(T event), |
| 44 {Function onError, |
| 45 void onDone(), |
| 46 bool cancelOnError}) { |
| 47 if (_source == null) { |
| 48 throw new StateError("Stream has already been listened to."); |
| 49 } |
| 50 cancelOnError = (true == cancelOnError); |
| 51 var subscription = _source; |
| 52 _source = null; |
| 53 var result; |
| 54 if (cancelOnError) { |
| 55 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); |
| 56 } else { |
| 57 // Wrap the subscription to ensure correct type parameter. |
| 58 result = new DelegatingStreamSubscription<T>(subscription); |
| 59 } |
| 60 result.onData(onData); |
| 61 result.onError(onError); |
| 62 result.onDone(onDone); |
| 63 subscription.resume(); |
| 64 return result; |
| 65 } |
| 66 } |
| 67 |
| 68 /// Subscription wrapper that cancels on error. |
| 69 /// |
| 70 /// Used by [SubscriptionStream] when forwarding a subscription |
| 71 /// created with `cancelOnError` as `true` to one with (assumed) |
| 72 /// `cancelOnError` as `false`. It automatically cancels the |
| 73 /// source subscription on the first error. |
| 74 class _CancelOnErrorSubscriptionWrapper<T> |
| 75 extends DelegatingStreamSubscription<T> { |
| 76 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) |
| 77 : super(subscription); |
| 78 |
| 79 void onError(Function handleError) { |
| 80 // Cancel when receiving an error. |
| 81 super.onError((error, StackTrace stackTrace) { |
| 82 var cancelFuture = super.cancel(); |
| 83 if (cancelFuture != null) { |
| 84 // Wait for the cancel to complete before sending the error event. |
| 85 cancelFuture.whenComplete(() { |
| 86 if (handleError is ZoneBinaryCallback) { |
| 87 handleError(error, stackTrace); |
| 88 } else { |
| 89 handleError(error); |
| 90 } |
| 91 }); |
| 92 } else { |
| 93 if (handleError is ZoneBinaryCallback) { |
| 94 handleError(error, stackTrace); |
| 95 } else { |
| 96 handleError(error); |
| 97 } |
| 98 } |
| 99 }); |
| 100 } |
| 101 } |
OLD | NEW |