Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(170)

Side by Side Diff: lib/src/subscription_stream.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.streams.stream_subscription;
6
7 import 'dart:async';
8
9 import "delegates.dart";
10
11 /// A [Stream] adapter for a [StreamSubscription].
12 ///
13 /// This class allows as `StreamSubscription` to be treated as a `Stream`.
14 ///
15 /// The subscription is paused until the stream is listened to,
16 /// then it is resumed and the events are passed on to the
17 /// stream's new subscription.
18 ///
19 /// This class assumes that is has control over the original subscription.
20 /// If other code is accessing the subscription, results may be unpredictable.
21 class SubscriptionStream<T> extends Stream<T> {
22 StreamSubscription _source;
23 final bool _isCancelOnError;
24 bool get isBroadcastStream => false;
25
26 /// Create a `Stream` from [subscription].
27 ///
28 /// The subscription should not be paused. This class will not resume prior
29 /// pauses, so being paused is indistinguishable from not providing any
30 /// events.
31 ///
32 /// If the original `isCancelOnError` value for the subscription is known,
33 /// it can be passed as the [isCancelOnError] parameter.
34 ///
35 /// If the original subscription cancels on an error, this stream's
36 /// subscription will also do so, whether it's requested in the [listen] call
37 /// or not.
38 ///
39 /// If the `SubscriptionStream` knows this the original subscription's
40 /// error behavior, it can adapt the new stream's subscription.
41 ///
42 /// If the original cancels on an error and the new subscription shouldn't,
43 /// an extra done event is added after the first error, when the
44 /// original subscription is being cancelled.
45 ///
46 /// If the original doesn't cancel on an error and the new subscripton does,
47 /// a cancel is added on the first error.
48 ///
49 /// The default is to assume the subscription will not stop after an error.
50 /// If that assumption is wrong, the result is a stream that stops providing
51 /// events after the first error, including done events.
52 SubscriptionStream(StreamSubscription subscription,
53 {bool isCancelOnError: false})
54 : _source = subscription,
55 _isCancelOnError = isCancelOnError {
56 _source.pause();
57 // Clear callbacks to avoid keeping them alive unnecessarily.
58 _source.onData(null);
59 _source.onError(null);
60 _source.onDone(null);
61 }
62
63 StreamSubscription<T> listen(void onData(T event),
64 {Function onError,
65 void onDone(),
66 bool cancelOnError}) {
67 if (_source == null) {
68 throw new StateError("Stream has already been listened to.");
69 }
70 cancelOnError = (true == cancelOnError);
71 var subscription = _source;
72 _source = null;
73 var result;
74 if (cancelOnError == _isCancelOnError) {
75 if (subscription is StreamSubscription<T>) {
76 result = subscription;
77 } else {
78 // Type parameters don't match - cast by wrapping.
79 result = new DelegatingStreamSubscription<T>(subscription);
80 }
81 } else if (cancelOnError) {
82 assert(!_isCancelOnError);
83 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
84 } else {
85 assert(!cancelOnError && _isCancelOnError);
86 result = new _DoneAfterErrorSubscriptionWrapper(subscription);
87 }
88 result.onData(onData);
89 result.onError(onError);
90 result.onDone(onDone);
91 subscription.resume();
92 return result;
93 }
94 }
95
96 class _CancelOnErrorSubscriptionWrapper<T>
97 extends DelegatingStreamSubscription<T> {
98 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
99 : super(subscription);
100
101 void onError(Function handleError) {
102 handleError = _cancelBeforeError(handleError);
103 super.onError(handleError);
104 }
105
106 /// Helper function used by [onError].
107 ///
108 /// Returns an error handler which cancels the stream when it receives an
109 /// error.
110 Function _cancelBeforeError(Function handleError) {
111 return (e, s) {
112 super.cancel();
113 if (handleError is _BinaryCallback) {
114 handleError(e, s);
115 } else {
116 handleError(e);
117 }
118 };
119 }
120 }
121
122 /// Subscription wrapper that assumes wrapped subscription is cancel-on-error.
123 ///
124 /// This adapts a cancel-on-error subscription as non-cancel-on-error,
125 /// by introducing a done event after the error event that terminated
126 /// the wrapped subscription.
127 class _DoneAfterErrorSubscriptionWrapper<T>
128 extends DelegatingStreamSubscription<T> {
129 // Store the done handler so it can be called after an error.
130
131 var _onDone;
132 _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription)
133 : super(subscription);
134
135 void onDone(void handleDone()) {
136 super.onDone(handleDone);
137 _onDone = handleDone;
138 }
139
140 void onError(Function errorHandler) {
141 errorHandler = _doneAfterError(errorHandler);
142 super.onError(errorHandler);
143 }
144
145 Function _doneAfterError(Function errorHandler) {
146 return (e, s) {
147 // Ensure that the subscription is really cancelled
148 // so we don't get two done events ever - even if the
149 // class is used incorrectly.
150 super.cancel();
151 scheduleMicrotask(() {
152 if (_onDone != null) {
153 _onDone();
154 }
155 });
156 if (errorHandler is _BinaryCallback) {
157 errorHandler(e, s);
158 } else {
159 errorHandler(e);
160 }
161 };
162 }
163 }
164
165 typedef _BinaryCallback(e, s);
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698