OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import 'async_memoizer.dart'; | 7 import 'async_memoizer.dart'; |
8 | 8 |
9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); | 9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); |
10 | 10 |
11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); | 11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); |
12 | 12 |
13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to | 13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to |
14 /// a stream. | 14 /// a stream. |
15 /// | 15 /// |
16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or | 16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or |
17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. | 17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. |
18 /// By default, handlers just forward to the underlying subscription. | 18 /// By default, handlers just forward to the underlying subscription. |
19 /// | 19 /// |
20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the | 20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the |
21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the | 21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the |
22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has | 22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has |
23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether | 23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether |
24 /// the subscription is paused already or not. | 24 /// the subscription is paused already or not. |
25 /// | 25 /// |
26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must | 26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must |
27 /// synchronously call the corresponding method** on the inner | 27 /// synchronously call the corresponding method** on the inner |
28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] | 28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] |
29 /// must call `pause()`, and [handleResume] must call `resume()`. | 29 /// must call `pause()`, and [handleResume] must call `resume()`. |
30 StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/( | 30 StreamTransformer<T, T> subscriptionTransformer<T>( |
31 {Future handleCancel(StreamSubscription/*<T>*/ inner), | 31 {Future handleCancel(StreamSubscription<T> inner), |
32 void handlePause(StreamSubscription/*<T>*/ inner), | 32 void handlePause(StreamSubscription<T> inner), |
33 void handleResume(StreamSubscription/*<T>*/ inner)}) { | 33 void handleResume(StreamSubscription<T> inner)}) { |
34 return new StreamTransformer((stream, cancelOnError) { | 34 return new StreamTransformer((stream, cancelOnError) { |
35 return new _TransformedSubscription( | 35 return new _TransformedSubscription( |
36 stream.listen(null, cancelOnError: cancelOnError), | 36 stream.listen(null, cancelOnError: cancelOnError), |
37 handleCancel ?? (inner) => inner.cancel(), | 37 handleCancel ?? (inner) => inner.cancel(), |
38 handlePause ?? (inner) { | 38 handlePause ?? |
39 inner.pause(); | 39 (inner) { |
40 }, | 40 inner.pause(); |
41 handleResume ?? (inner) { | 41 }, |
42 inner.resume(); | 42 handleResume ?? |
43 }); | 43 (inner) { |
| 44 inner.resume(); |
| 45 }); |
44 }); | 46 }); |
45 } | 47 } |
46 | 48 |
47 /// A [StreamSubscription] wrapper that calls callbacks for subscription | 49 /// A [StreamSubscription] wrapper that calls callbacks for subscription |
48 /// methods. | 50 /// methods. |
49 class _TransformedSubscription<T> implements StreamSubscription<T> { | 51 class _TransformedSubscription<T> implements StreamSubscription<T> { |
50 /// The wrapped subscription. | 52 /// The wrapped subscription. |
51 StreamSubscription<T> _inner; | 53 StreamSubscription<T> _inner; |
52 | 54 |
53 /// The callback to run when [cancel] is called. | 55 /// The callback to run when [cancel] is called. |
54 final _AsyncHandler<T> _handleCancel; | 56 final _AsyncHandler<T> _handleCancel; |
55 | 57 |
56 /// The callback to run when [pause] is called. | 58 /// The callback to run when [pause] is called. |
57 final _VoidHandler<T> _handlePause; | 59 final _VoidHandler<T> _handlePause; |
58 | 60 |
59 /// The callback to run when [resume] is called. | 61 /// The callback to run when [resume] is called. |
60 final _VoidHandler<T> _handleResume; | 62 final _VoidHandler<T> _handleResume; |
61 | 63 |
62 bool get isPaused => _inner?.isPaused ?? false; | 64 bool get isPaused => _inner?.isPaused ?? false; |
63 | 65 |
64 _TransformedSubscription(this._inner, this._handleCancel, this._handlePause, | 66 _TransformedSubscription( |
65 this._handleResume); | 67 this._inner, this._handleCancel, this._handlePause, this._handleResume); |
66 | 68 |
67 void onData(void handleData(T data)) { | 69 void onData(void handleData(T data)) { |
68 _inner?.onData(handleData); | 70 _inner?.onData(handleData); |
69 } | 71 } |
70 | 72 |
71 void onError(Function handleError) { | 73 void onError(Function handleError) { |
72 _inner?.onError(handleError); | 74 _inner?.onError(handleError); |
73 } | 75 } |
74 | 76 |
75 void onDone(void handleDone()) { | 77 void onDone(void handleDone()) { |
76 _inner?.onDone(handleDone); | 78 _inner?.onDone(handleDone); |
77 } | 79 } |
78 | 80 |
79 Future cancel() => _cancelMemoizer.runOnce(() { | 81 Future cancel() => _cancelMemoizer.runOnce(() { |
80 var inner = _inner; | 82 var inner = _inner; |
81 _inner.onData(null); | 83 _inner.onData(null); |
82 _inner.onDone(null); | 84 _inner.onDone(null); |
83 | 85 |
84 // Setting onError to null will cause errors to be top-leveled. | 86 // Setting onError to null will cause errors to be top-leveled. |
85 _inner.onError((_, __) {}); | 87 _inner.onError((_, __) {}); |
86 _inner = null; | 88 _inner = null; |
87 return _handleCancel(inner); | 89 return _handleCancel(inner); |
88 }); | 90 }); |
89 final _cancelMemoizer = new AsyncMemoizer(); | 91 final _cancelMemoizer = new AsyncMemoizer(); |
90 | 92 |
91 void pause([Future resumeFuture]) { | 93 void pause([Future resumeFuture]) { |
92 if (_cancelMemoizer.hasRun) return; | 94 if (_cancelMemoizer.hasRun) return; |
93 if (resumeFuture != null) resumeFuture.whenComplete(resume); | 95 if (resumeFuture != null) resumeFuture.whenComplete(resume); |
94 _handlePause(_inner); | 96 _handlePause(_inner); |
95 } | 97 } |
96 | 98 |
97 void resume() { | 99 void resume() { |
98 if (_cancelMemoizer.hasRun) return; | 100 if (_cancelMemoizer.hasRun) return; |
99 _handleResume(_inner); | 101 _handleResume(_inner); |
100 } | 102 } |
101 | 103 |
102 Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) => | 104 Future<E> asFuture<E>([E futureValue]) => |
103 _inner?.asFuture(futureValue) ?? new Completer/*<E>*/().future; | 105 _inner?.asFuture(futureValue) ?? new Completer<E>().future; |
104 } | 106 } |
OLD | NEW |