Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: sdk/lib/async/stream_transformers.dart

Issue 2202533003: Return futures on Stream.cancel when possible. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Remove debug-print. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** 7 /**
8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
9 */ 9 */
10 class _EventSinkWrapper<T> implements EventSink<T> { 10 class _EventSinkWrapper<T> implements EventSink<T> {
(...skipping 15 matching lines...) Expand all
26 * the transformation. The returned sink is the transformation's input. 26 * the transformation. The returned sink is the transformation's input.
27 */ 27 */
28 class _SinkTransformerStreamSubscription<S, T> 28 class _SinkTransformerStreamSubscription<S, T>
29 extends _BufferingStreamSubscription<T> { 29 extends _BufferingStreamSubscription<T> {
30 /// The transformer's input sink. 30 /// The transformer's input sink.
31 EventSink<S> _transformerSink; 31 EventSink<S> _transformerSink;
32 32
33 /// The subscription to the input stream. 33 /// The subscription to the input stream.
34 StreamSubscription<S> _subscription; 34 StreamSubscription<S> _subscription;
35 35
36 /// The cancelFuture (if any).
37 Future _cancelFuture;
Lasse Reichstein Nielsen 2016/08/01 15:23:26 Why a field? I only see it used inside one method?
floitsch 2016/08/01 21:00:37 Acknowledged.
38
36 _SinkTransformerStreamSubscription(Stream<S> source, 39 _SinkTransformerStreamSubscription(Stream<S> source,
37 _SinkMapper<S, T> mapper, 40 _SinkMapper<S, T> mapper,
38 void onData(T data), 41 void onData(T data),
39 Function onError, 42 Function onError,
40 void onDone(), 43 void onDone(),
41 bool cancelOnError) 44 bool cancelOnError)
42 // We set the adapter's target only when the user is allowed to send data. 45 // We set the adapter's target only when the user is allowed to send data.
43 : super(onData, onError, onDone, cancelOnError) { 46 : super(onData, onError, onDone, cancelOnError) {
44 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); 47 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this);
45 _transformerSink = mapper(eventSink); 48 _transformerSink = mapper(eventSink);
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
102 } 105 }
103 106
104 void _onResume() { 107 void _onResume() {
105 if (_isSubscribed) _subscription.resume(); 108 if (_isSubscribed) _subscription.resume();
106 } 109 }
107 110
108 Future _onCancel() { 111 Future _onCancel() {
109 if (_isSubscribed) { 112 if (_isSubscribed) {
110 StreamSubscription subscription = _subscription; 113 StreamSubscription subscription = _subscription;
111 _subscription = null; 114 _subscription = null;
112 subscription.cancel(); 115 _cancelFuture = subscription.cancel();
Lasse Reichstein Nielsen 2016/08/01 15:23:26 That is, why is _cancelFuture not a local variable
floitsch 2016/08/01 15:55:55 In case someone calls `cancel` multiple times.
Lasse Reichstein Nielsen 2016/08/01 16:49:38 Possibly a good point. I'm not sure _onCancel will
floitsch 2016/08/01 21:00:37 You are right. Changed a bit more...
113 } 116 }
114 return null; 117 return _cancelFuture ?? new Future.value(null);
115 } 118 }
116 119
117 void _handleData(S data) { 120 void _handleData(S data) {
118 try { 121 try {
119 _transformerSink.add(data); 122 _transformerSink.add(data);
120 } catch (e, s) { 123 } catch (e, s) {
121 _addError(e, s); 124 _addError(e, s);
122 } 125 }
123 } 126 }
124 127
(...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after
304 void onDone(), 307 void onDone(),
305 bool cancelOnError }) { 308 bool cancelOnError }) {
306 cancelOnError = identical(true, cancelOnError); 309 cancelOnError = identical(true, cancelOnError);
307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); 310 StreamSubscription<T> result = _transformer(_stream, cancelOnError);
308 result.onData(onData); 311 result.onData(onData);
309 result.onError(onError); 312 result.onError(onError);
310 result.onDone(onDone); 313 result.onDone(onDone);
311 return result; 314 return result;
312 } 315 }
313 } 316 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698