Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: lib/src/subscription_stream.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Add all.dart to test. Apparently people like that. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.subscription_stream;
6
7 import 'dart:async';
8
9 import "delegating_stream_subscription.dart";
10
11 /// A [Stream] adapter for a [StreamSubscription].
12 ///
13 /// This class allows as `StreamSubscription` to be treated as a `Stream`.
14 ///
15 /// The subscription is paused until the stream is listened to,
16 /// then it is resumed and the events are passed on to the
17 /// stream's new subscription.
18 ///
19 /// This class assumes that is has control over the original subscription.
20 /// If other code is accessing the subscription, results may be unpredictable.
21 class SubscriptionStream<T> extends Stream<T> {
22 /// The subscription providing the events for this stream.
23 StreamSubscription _source;
24
25 /// Whether [_source] was created with `cancelOnError` set to `true.`
26 final bool _sourceCancelsOnError;
27
28 /// Create a single-subscription `Stream` from [subscription].
29 ///
30 /// The `subscription` should not be paused. This class will not resume prior
31 /// pauses, so being paused is indistinguishable from not providing any
32 /// events.
33 ///
34 /// If the original `cancelOnError` value used when creating `subscription`
35 /// is known, it can be passed as the [isCancelOnError] parameter.
36 ///
37 /// The [iscancelOnError] argument doesn't need to match the original
38 /// subscription's `cancelOnError` value;
39 /// however, if [isCancelOnError] is `false`
40 /// and the original subscription's `cancelOnError` value is `true`,
41 /// this stream will never emit a done event after an error event.
42 /// On the other hand, if [isCancelOnError] is true, this stream
43 /// will emit a done event and stop after the first error
44 /// regardless of both the original subscription's value
45 /// and the `cancelOnError` value used when listening to this stream.
46 SubscriptionStream(StreamSubscription subscription,
47 {bool isCancelOnError: false})
nweiz 2015/06/18 23:44:27 I thought you were getting rid of this?
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Ack, I was. Got lost in the other changes, I guess
48 : _source = subscription,
49 _sourceCancelsOnError = isCancelOnError {
50 _source.pause();
51 // Clear callbacks to avoid keeping them alive unnecessarily.
52 _source.onData(null);
53 _source.onError(null);
54 _source.onDone(null);
55 }
56
57 StreamSubscription<T> listen(void onData(T event),
58 {Function onError,
59 void onDone(),
60 bool cancelOnError}) {
61 if (_source == null) {
62 throw new StateError("Stream has already been listened to.");
63 }
64 cancelOnError = (true == cancelOnError);
65 var subscription = _source;
66 _source = null;
67 var result;
68 if (cancelOnError == _sourceCancelsOnError) {
69 // Type parameters may not match - cast by wrapping.
70 result = new DelegatingStreamSubscription<T>(subscription);
71 } else if (cancelOnError) {
72 assert(!_sourceCancelsOnError);
73 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
74 } else {
75 assert(!cancelOnError && _sourceCancelsOnError);
76 result = new _DoneAfterErrorSubscriptionWrapper(subscription);
77 }
78 result.onData(onData);
79 result.onError(onError);
80 result.onDone(onDone);
81 subscription.resume();
82 return result;
83 }
84 }
85
86 /// Subscription wrapper that cancels on error.
87 ///
88 /// Used by [SubscriptionStream] when forwarding a subscription
89 /// created with `cancelOnError` as `true` to one with (assumed)
90 /// `cancelOnError` as `false`. It automatically cancels the
91 /// source subscription on the first error.
92 class _CancelOnErrorSubscriptionWrapper<T>
93 extends DelegatingStreamSubscription<T> {
94 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
95 : super(subscription);
96
97 void onError(Function handleError) {
98 // Cancel when receiving an error.
99 super.onError((error, StackTrace stackTrace) {
100 super.cancel();
101 if (handleError is ZoneBinaryCallback) {
102 handleError(error, stackTrace);
103 } else {
104 handleError(error);
105 }
106 });
107 }
108 }
109
110 /// Subscription wrapper that sends a done event after an error.
111 ///
112 /// If the source subscription was created with `cancelOnError` as true,
113 /// this subscription will look like a non-`cancelOnError` subscription
114 /// that happens to end normally after the first error.
115 ///
116 /// If the source subscription isn't `cancelOnError` then it's canceled
117 /// after the first error anyway, to ensure consistent behavior.
118 class _DoneAfterErrorSubscriptionWrapper<T>
119 extends DelegatingStreamSubscription<T> {
120 // Stores the done handler so it can be called after an error.
121 var _onDone;
122
123 _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription)
124 : super(subscription);
125
126 void onDone(void handleDone()) {
127 super.onDone(handleDone);
128 _onDone = handleDone;
129 }
130
131 void onError(Function errorHandler) {
132 errorHandler = _doneAfterError(errorHandler);
133 super.onError(errorHandler);
134 }
135
136 Function _doneAfterError(Function errorHandler) {
137 return (error, StackTrace stackTrace) {
138 // Ensure that the subscription is really canceled
139 // so we don't get two done events ever - even if the
140 // class is used incorrectly.
141 super.cancel();
142 scheduleMicrotask(() {
143 if (_onDone != null) {
144 _onDone();
145 }
146 });
147 if (errorHandler is ZoneBinaryCallback) {
148 errorHandler(error, stackTrace);
149 } else {
150 errorHandler(error);
151 }
152 };
153 }
154 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698