OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 import 'dart:async'; |
| 6 |
| 7 import 'async_memoizer.dart'; |
| 8 |
| 9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); |
| 10 |
| 11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); |
| 12 |
| 13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to |
| 14 /// a stream. |
| 15 /// |
| 16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or |
| 17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. |
| 18 /// By default, handlers just forward to the underlying subscription. |
| 19 /// |
| 20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the |
| 21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the |
| 22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has |
| 23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether |
| 24 /// the subscription is paused already or not. |
| 25 /// |
| 26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must |
| 27 /// synchronously call the corresponding method** on the inner |
| 28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] |
| 29 /// must call `pause()`, and [handleResume] must call `resume()`. |
| 30 StreamTransformer<T, T> subscriptionTransformer<T>( |
| 31 {Future handleCancel(StreamSubscription<T> inner), |
| 32 void handlePause(StreamSubscription<T> inner), |
| 33 void handleResume(StreamSubscription<T> inner)}) { |
| 34 return new StreamTransformer((stream, cancelOnError) { |
| 35 return new _TransformedSubscription( |
| 36 stream.listen(null, cancelOnError: cancelOnError), |
| 37 handleCancel ?? (inner) => inner.cancel(), |
| 38 handlePause ?? |
| 39 (inner) { |
| 40 inner.pause(); |
| 41 }, |
| 42 handleResume ?? |
| 43 (inner) { |
| 44 inner.resume(); |
| 45 }); |
| 46 }); |
| 47 } |
| 48 |
| 49 /// A [StreamSubscription] wrapper that calls callbacks for subscription |
| 50 /// methods. |
| 51 class _TransformedSubscription<T> implements StreamSubscription<T> { |
| 52 /// The wrapped subscription. |
| 53 StreamSubscription<T> _inner; |
| 54 |
| 55 /// The callback to run when [cancel] is called. |
| 56 final _AsyncHandler<T> _handleCancel; |
| 57 |
| 58 /// The callback to run when [pause] is called. |
| 59 final _VoidHandler<T> _handlePause; |
| 60 |
| 61 /// The callback to run when [resume] is called. |
| 62 final _VoidHandler<T> _handleResume; |
| 63 |
| 64 bool get isPaused => _inner?.isPaused ?? false; |
| 65 |
| 66 _TransformedSubscription( |
| 67 this._inner, this._handleCancel, this._handlePause, this._handleResume); |
| 68 |
| 69 void onData(void handleData(T data)) { |
| 70 _inner?.onData(handleData); |
| 71 } |
| 72 |
| 73 void onError(Function handleError) { |
| 74 _inner?.onError(handleError); |
| 75 } |
| 76 |
| 77 void onDone(void handleDone()) { |
| 78 _inner?.onDone(handleDone); |
| 79 } |
| 80 |
| 81 Future cancel() => _cancelMemoizer.runOnce(() { |
| 82 var inner = _inner; |
| 83 _inner.onData(null); |
| 84 _inner.onDone(null); |
| 85 |
| 86 // Setting onError to null will cause errors to be top-leveled. |
| 87 _inner.onError((_, __) {}); |
| 88 _inner = null; |
| 89 return _handleCancel(inner); |
| 90 }); |
| 91 final _cancelMemoizer = new AsyncMemoizer(); |
| 92 |
| 93 void pause([Future resumeFuture]) { |
| 94 if (_cancelMemoizer.hasRun) return; |
| 95 if (resumeFuture != null) resumeFuture.whenComplete(resume); |
| 96 _handlePause(_inner); |
| 97 } |
| 98 |
| 99 void resume() { |
| 100 if (_cancelMemoizer.hasRun) return; |
| 101 _handleResume(_inner); |
| 102 } |
| 103 |
| 104 Future<E> asFuture<E>([E futureValue]) => |
| 105 _inner?.asFuture(futureValue) ?? new Completer<E>().future; |
| 106 } |
OLD | NEW |