| OLD | NEW |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 import 'async_memoizer.dart'; | 7 import 'async_memoizer.dart'; |
| 8 | 8 |
| 9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); | 9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); |
| 10 | 10 |
| 11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); | 11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); |
| 12 | 12 |
| 13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to | 13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to |
| 14 /// a stream. | 14 /// a stream. |
| 15 /// | 15 /// |
| 16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or | 16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or |
| 17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. | 17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. |
| 18 /// By default, handlers just forward to the underlying subscription. | 18 /// By default, handlers just forward to the underlying subscription. |
| 19 /// | 19 /// |
| 20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the | 20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the |
| 21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the | 21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the |
| 22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has | 22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has |
| 23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether | 23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether |
| 24 /// the subscription is paused already or not. | 24 /// the subscription is paused already or not. |
| 25 /// | 25 /// |
| 26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must | 26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must |
| 27 /// synchronously call the corresponding method** on the inner | 27 /// synchronously call the corresponding method** on the inner |
| 28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] | 28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] |
| 29 /// must call `pause()`, and [handleResume] must call `resume()`. | 29 /// must call `pause()`, and [handleResume] must call `resume()`. |
| 30 StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/( | 30 StreamTransformer<T, T> subscriptionTransformer<T>( |
| 31 {Future handleCancel(StreamSubscription/*<T>*/ inner), | 31 {Future handleCancel(StreamSubscription<T> inner), |
| 32 void handlePause(StreamSubscription/*<T>*/ inner), | 32 void handlePause(StreamSubscription<T> inner), |
| 33 void handleResume(StreamSubscription/*<T>*/ inner)}) { | 33 void handleResume(StreamSubscription<T> inner)}) { |
| 34 return new StreamTransformer((stream, cancelOnError) { | 34 return new StreamTransformer((stream, cancelOnError) { |
| 35 return new _TransformedSubscription( | 35 return new _TransformedSubscription( |
| 36 stream.listen(null, cancelOnError: cancelOnError), | 36 stream.listen(null, cancelOnError: cancelOnError), |
| 37 handleCancel ?? (inner) => inner.cancel(), | 37 handleCancel ?? (inner) => inner.cancel(), |
| 38 handlePause ?? (inner) { | 38 handlePause ?? |
| 39 inner.pause(); | 39 (inner) { |
| 40 }, | 40 inner.pause(); |
| 41 handleResume ?? (inner) { | 41 }, |
| 42 inner.resume(); | 42 handleResume ?? |
| 43 }); | 43 (inner) { |
| 44 inner.resume(); |
| 45 }); |
| 44 }); | 46 }); |
| 45 } | 47 } |
| 46 | 48 |
| 47 /// A [StreamSubscription] wrapper that calls callbacks for subscription | 49 /// A [StreamSubscription] wrapper that calls callbacks for subscription |
| 48 /// methods. | 50 /// methods. |
| 49 class _TransformedSubscription<T> implements StreamSubscription<T> { | 51 class _TransformedSubscription<T> implements StreamSubscription<T> { |
| 50 /// The wrapped subscription. | 52 /// The wrapped subscription. |
| 51 StreamSubscription<T> _inner; | 53 StreamSubscription<T> _inner; |
| 52 | 54 |
| 53 /// The callback to run when [cancel] is called. | 55 /// The callback to run when [cancel] is called. |
| 54 final _AsyncHandler<T> _handleCancel; | 56 final _AsyncHandler<T> _handleCancel; |
| 55 | 57 |
| 56 /// The callback to run when [pause] is called. | 58 /// The callback to run when [pause] is called. |
| 57 final _VoidHandler<T> _handlePause; | 59 final _VoidHandler<T> _handlePause; |
| 58 | 60 |
| 59 /// The callback to run when [resume] is called. | 61 /// The callback to run when [resume] is called. |
| 60 final _VoidHandler<T> _handleResume; | 62 final _VoidHandler<T> _handleResume; |
| 61 | 63 |
| 62 bool get isPaused => _inner?.isPaused ?? false; | 64 bool get isPaused => _inner?.isPaused ?? false; |
| 63 | 65 |
| 64 _TransformedSubscription(this._inner, this._handleCancel, this._handlePause, | 66 _TransformedSubscription( |
| 65 this._handleResume); | 67 this._inner, this._handleCancel, this._handlePause, this._handleResume); |
| 66 | 68 |
| 67 void onData(void handleData(T data)) { | 69 void onData(void handleData(T data)) { |
| 68 _inner?.onData(handleData); | 70 _inner?.onData(handleData); |
| 69 } | 71 } |
| 70 | 72 |
| 71 void onError(Function handleError) { | 73 void onError(Function handleError) { |
| 72 _inner?.onError(handleError); | 74 _inner?.onError(handleError); |
| 73 } | 75 } |
| 74 | 76 |
| 75 void onDone(void handleDone()) { | 77 void onDone(void handleDone()) { |
| 76 _inner?.onDone(handleDone); | 78 _inner?.onDone(handleDone); |
| 77 } | 79 } |
| 78 | 80 |
| 79 Future cancel() => _cancelMemoizer.runOnce(() { | 81 Future cancel() => _cancelMemoizer.runOnce(() { |
| 80 var inner = _inner; | 82 var inner = _inner; |
| 81 _inner.onData(null); | 83 _inner.onData(null); |
| 82 _inner.onDone(null); | 84 _inner.onDone(null); |
| 83 | 85 |
| 84 // Setting onError to null will cause errors to be top-leveled. | 86 // Setting onError to null will cause errors to be top-leveled. |
| 85 _inner.onError((_, __) {}); | 87 _inner.onError((_, __) {}); |
| 86 _inner = null; | 88 _inner = null; |
| 87 return _handleCancel(inner); | 89 return _handleCancel(inner); |
| 88 }); | 90 }); |
| 89 final _cancelMemoizer = new AsyncMemoizer(); | 91 final _cancelMemoizer = new AsyncMemoizer(); |
| 90 | 92 |
| 91 void pause([Future resumeFuture]) { | 93 void pause([Future resumeFuture]) { |
| 92 if (_cancelMemoizer.hasRun) return; | 94 if (_cancelMemoizer.hasRun) return; |
| 93 if (resumeFuture != null) resumeFuture.whenComplete(resume); | 95 if (resumeFuture != null) resumeFuture.whenComplete(resume); |
| 94 _handlePause(_inner); | 96 _handlePause(_inner); |
| 95 } | 97 } |
| 96 | 98 |
| 97 void resume() { | 99 void resume() { |
| 98 if (_cancelMemoizer.hasRun) return; | 100 if (_cancelMemoizer.hasRun) return; |
| 99 _handleResume(_inner); | 101 _handleResume(_inner); |
| 100 } | 102 } |
| 101 | 103 |
| 102 Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) => | 104 Future<E> asFuture<E>([E futureValue]) => |
| 103 _inner?.asFuture(futureValue) ?? new Completer/*<E>*/().future; | 105 _inner?.asFuture(futureValue) ?? new Completer<E>().future; |
| 104 } | 106 } |
| OLD | NEW |