Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
| 9 */ | 9 */ |
| 10 class _EventSinkWrapper<T> implements EventSink<T> { | 10 class _EventSinkWrapper<T> implements EventSink<T> { |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 26 * the transformation. The returned sink is the transformation's input. | 26 * the transformation. The returned sink is the transformation's input. |
| 27 */ | 27 */ |
| 28 class _SinkTransformerStreamSubscription<S, T> | 28 class _SinkTransformerStreamSubscription<S, T> |
| 29 extends _BufferingStreamSubscription<T> { | 29 extends _BufferingStreamSubscription<T> { |
| 30 /// The transformer's input sink. | 30 /// The transformer's input sink. |
| 31 EventSink<S> _transformerSink; | 31 EventSink<S> _transformerSink; |
| 32 | 32 |
| 33 /// The subscription to the input stream. | 33 /// The subscription to the input stream. |
| 34 StreamSubscription<S> _subscription; | 34 StreamSubscription<S> _subscription; |
| 35 | 35 |
| 36 /// The cancelFuture (if any). | |
| 37 Future _cancelFuture; | |
|
Lasse Reichstein Nielsen
2016/08/01 15:23:26
Why a field? I only see it used inside one method?
floitsch
2016/08/01 21:00:37
Acknowledged.
| |
| 38 | |
| 36 _SinkTransformerStreamSubscription(Stream<S> source, | 39 _SinkTransformerStreamSubscription(Stream<S> source, |
| 37 _SinkMapper<S, T> mapper, | 40 _SinkMapper<S, T> mapper, |
| 38 void onData(T data), | 41 void onData(T data), |
| 39 Function onError, | 42 Function onError, |
| 40 void onDone(), | 43 void onDone(), |
| 41 bool cancelOnError) | 44 bool cancelOnError) |
| 42 // We set the adapter's target only when the user is allowed to send data. | 45 // We set the adapter's target only when the user is allowed to send data. |
| 43 : super(onData, onError, onDone, cancelOnError) { | 46 : super(onData, onError, onDone, cancelOnError) { |
| 44 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); | 47 _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this); |
| 45 _transformerSink = mapper(eventSink); | 48 _transformerSink = mapper(eventSink); |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 102 } | 105 } |
| 103 | 106 |
| 104 void _onResume() { | 107 void _onResume() { |
| 105 if (_isSubscribed) _subscription.resume(); | 108 if (_isSubscribed) _subscription.resume(); |
| 106 } | 109 } |
| 107 | 110 |
| 108 Future _onCancel() { | 111 Future _onCancel() { |
| 109 if (_isSubscribed) { | 112 if (_isSubscribed) { |
| 110 StreamSubscription subscription = _subscription; | 113 StreamSubscription subscription = _subscription; |
| 111 _subscription = null; | 114 _subscription = null; |
| 112 subscription.cancel(); | 115 _cancelFuture = subscription.cancel(); |
|
Lasse Reichstein Nielsen
2016/08/01 15:23:26
That is, why is _cancelFuture not a local variable
floitsch
2016/08/01 15:55:55
In case someone calls `cancel` multiple times.
Lasse Reichstein Nielsen
2016/08/01 16:49:38
Possibly a good point.
I'm not sure _onCancel will
floitsch
2016/08/01 21:00:37
You are right.
Changed a bit more...
| |
| 113 } | 116 } |
| 114 return null; | 117 return _cancelFuture ?? new Future.value(null); |
| 115 } | 118 } |
| 116 | 119 |
| 117 void _handleData(S data) { | 120 void _handleData(S data) { |
| 118 try { | 121 try { |
| 119 _transformerSink.add(data); | 122 _transformerSink.add(data); |
| 120 } catch (e, s) { | 123 } catch (e, s) { |
| 121 _addError(e, s); | 124 _addError(e, s); |
| 122 } | 125 } |
| 123 } | 126 } |
| 124 | 127 |
| (...skipping 179 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 304 void onDone(), | 307 void onDone(), |
| 305 bool cancelOnError }) { | 308 bool cancelOnError }) { |
| 306 cancelOnError = identical(true, cancelOnError); | 309 cancelOnError = identical(true, cancelOnError); |
| 307 StreamSubscription<T> result = _transformer(_stream, cancelOnError); | 310 StreamSubscription<T> result = _transformer(_stream, cancelOnError); |
| 308 result.onData(onData); | 311 result.onData(onData); |
| 309 result.onError(onError); | 312 result.onError(onError); |
| 310 result.onDone(onDone); | 313 result.onDone(onDone); |
| 311 return result; | 314 return result; |
| 312 } | 315 } |
| 313 } | 316 } |
| OLD | NEW |