OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** | 7 /** |
8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
9 */ | 9 */ |
10 class _EventSinkWrapper<T> implements EventSink<T> { | 10 class _EventSinkWrapper<T> implements EventSink<T> { |
(...skipping 15 matching lines...) Expand all Loading... | |
26 * the transformation. The returned sink is the transformation's input. | 26 * the transformation. The returned sink is the transformation's input. |
27 */ | 27 */ |
28 class _SinkTransformerStreamSubscription<S, T> | 28 class _SinkTransformerStreamSubscription<S, T> |
29 extends _BufferingStreamSubscription<T> { | 29 extends _BufferingStreamSubscription<T> { |
30 /// The transformer's input sink. | 30 /// The transformer's input sink. |
31 EventSink<S> _transformerSink; | 31 EventSink<S> _transformerSink; |
32 | 32 |
33 /// The subscription to the input stream. | 33 /// The subscription to the input stream. |
34 StreamSubscription<S> _subscription; | 34 StreamSubscription<S> _subscription; |
35 | 35 |
36 /// The cancelFuture (if any). | |
37 Future _cancelFuture; | |
Lasse Reichstein Nielsen
2016/08/01 15:23:26
Why a field? I only see it used inside one method?
floitsch
2016/08/01 21:00:37
Acknowledged.
| |
38 | |
36 _SinkTransformerStreamSubscription(Stream<S> source, | 39 _SinkTransformerStreamSubscription(Stream<S> source, |
37 _SinkMapper<S, T> mapper, | 40 _SinkMapper<S, T> mapper, |
38 void onData(T data), | 41 void onData(T data), |
39 Function onError, | 42 Function onError, |
40 void onDone(), | 43 void onDone(), |
41 bool cancelOnError) | 44 bool cancelOnError) |
42 // We set the adapter's target only when the user is allowed to send data. | 45 // We set the adapter's target only when the user is allowed to send data. |
43 : super(onData, onError, onDone, cancelOnError) { | 46 : super(onData, onError, onDone, cancelOnError) { |
44 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); | 47 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); |
45 _transformerSink = mapper(eventSink); | 48 _transformerSink = mapper(eventSink); |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
102 } | 105 } |
103 | 106 |
104 void _onResume() { | 107 void _onResume() { |
105 if (_isSubscribed) _subscription.resume(); | 108 if (_isSubscribed) _subscription.resume(); |
106 } | 109 } |
107 | 110 |
108 Future _onCancel() { | 111 Future _onCancel() { |
109 if (_isSubscribed) { | 112 if (_isSubscribed) { |
110 StreamSubscription subscription = _subscription; | 113 StreamSubscription subscription = _subscription; |
111 _subscription = null; | 114 _subscription = null; |
112 subscription.cancel(); | 115 _cancelFuture = subscription.cancel(); |
Lasse Reichstein Nielsen
2016/08/01 15:23:26
That is, why is _cancelFuture not a local variable
floitsch
2016/08/01 15:55:55
In case someone calls `cancel` multiple times.
Lasse Reichstein Nielsen
2016/08/01 16:49:38
Possibly a good point.
I'm not sure _onCancel will
floitsch
2016/08/01 21:00:37
You are right.
Changed a bit more...
| |
113 } | 116 } |
114 return null; | 117 return _cancelFuture ?? new Future.value(null); |
115 } | 118 } |
116 | 119 |
117 void _handleData(S data) { | 120 void _handleData(S data) { |
118 try { | 121 try { |
119 _transformerSink.add(data); | 122 _transformerSink.add(data); |
120 } catch (e, s) { | 123 } catch (e, s) { |
121 _addError(e, s); | 124 _addError(e, s); |
122 } | 125 } |
123 } | 126 } |
124 | 127 |
(...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
304 void onDone(), | 307 void onDone(), |
305 bool cancelOnError }) { | 308 bool cancelOnError }) { |
306 cancelOnError = identical(true, cancelOnError); | 309 cancelOnError = identical(true, cancelOnError); |
307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); | 310 StreamSubscription<T> result = _transformer(_stream, cancelOnError); |
308 result.onData(onData); | 311 result.onData(onData); |
309 result.onError(onError); | 312 result.onError(onError); |
310 result.onDone(onDone); | 313 result.onDone(onDone); |
311 return result; | 314 return result; |
312 } | 315 } |
313 } | 316 } |
OLD | NEW |