OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 import 'async_memoizer.dart'; | 7 import 'async_memoizer.dart'; |
8 | 8 |
9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); | 9 typedef Future _AsyncHandler<T>(StreamSubscription<T> inner); |
10 | 10 |
11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); | 11 typedef void _VoidHandler<T>(StreamSubscription<T> inner); |
12 | 12 |
13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to | 13 /// Creates a [StreamTransformer] that modifies the behavior of subscriptions to |
14 /// a stream. | 14 /// a stream. |
15 /// | 15 /// |
16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or | 16 /// When [StreamSubscription.cancel], [StreamSubscription.pause], or |
17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. | 17 /// [StreamSubscription.resume] is called, the corresponding handler is invoked. |
18 /// By default, handlers just forward to the underlying subscription. | 18 /// By default, handlers just forward to the underlying subscription. |
19 /// | 19 /// |
20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the | 20 /// Guarantees that none of the [StreamSubscription] callbacks and none of the |
21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the | 21 /// callbacks passed to `subscriptionTransformer()` will be invoked once the |
22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has | 22 /// transformed [StreamSubscription] has been canceled and `handleCancel()` has |
23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether | 23 /// run. The [handlePause] and [handleResume] are invoked regardless of whether |
24 /// the subscription is paused already or not. | 24 /// the subscription is paused already or not. |
25 /// | 25 /// |
26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must | 26 /// In order to preserve [StreamSubscription] guarantees, **all callbacks must |
27 /// synchronously call the corresponding method** on the inner | 27 /// synchronously call the corresponding method** on the inner |
28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] | 28 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause] |
29 /// must call `pause()`, and [handleResume] must call `resume()`. | 29 /// must call `pause()`, and [handleResume] must call `resume()`. |
30 StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/( | 30 StreamTransformer<T, T> subscriptionTransformer<T>( |
31 {Future handleCancel(StreamSubscription/*<T>*/ inner), | 31 {Future handleCancel(StreamSubscription<T> inner), |
32 void handlePause(StreamSubscription/*<T>*/ inner), | 32 void handlePause(StreamSubscription<T> inner), |
33 void handleResume(StreamSubscription/*<T>*/ inner)}) { | 33 void handleResume(StreamSubscription<T> inner)}) { |
34 return new StreamTransformer((stream, cancelOnError) { | 34 return new StreamTransformer((stream, cancelOnError) { |
35 return new _TransformedSubscription( | 35 return new _TransformedSubscription( |
36 stream.listen(null, cancelOnError: cancelOnError), | 36 stream.listen(null, cancelOnError: cancelOnError), |
37 handleCancel ?? (inner) => inner.cancel(), | 37 handleCancel ?? (inner) => inner.cancel(), |
38 handlePause ?? (inner) { | 38 handlePause ?? (inner) { |
39 inner.pause(); | 39 inner.pause(); |
40 }, | 40 }, |
41 handleResume ?? (inner) { | 41 handleResume ?? (inner) { |
42 inner.resume(); | 42 inner.resume(); |
43 }); | 43 }); |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
92 if (_cancelMemoizer.hasRun) return; | 92 if (_cancelMemoizer.hasRun) return; |
93 if (resumeFuture != null) resumeFuture.whenComplete(resume); | 93 if (resumeFuture != null) resumeFuture.whenComplete(resume); |
94 _handlePause(_inner); | 94 _handlePause(_inner); |
95 } | 95 } |
96 | 96 |
97 void resume() { | 97 void resume() { |
98 if (_cancelMemoizer.hasRun) return; | 98 if (_cancelMemoizer.hasRun) return; |
99 _handleResume(_inner); | 99 _handleResume(_inner); |
100 } | 100 } |
101 | 101 |
102 Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) => | 102 Future<E> asFuture<E>([E futureValue]) => |
103 _inner?.asFuture(futureValue) ?? new Completer/*<E>*/().future; | 103 _inner?.asFuture(futureValue) ?? new Completer<E>().future; |
104 } | 104 } |
OLD | NEW |