OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.cancelable_operation; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 | 6 |
9 import 'package:async/async.dart'; | 7 import 'package:async/async.dart'; |
10 | 8 |
11 /// An asynchronuos operation that can be cancelled. | 9 import 'utils.dart'; |
| 10 |
| 11 /// An asynchronous operation that can be cancelled. |
12 /// | 12 /// |
13 /// The value of this operation is exposed as [value]. When this operation is | 13 /// The value of this operation is exposed as [value]. When this operation is |
14 /// cancelled, [value] won't complete either successfully or with an error. If | 14 /// cancelled, [value] won't complete either successfully or with an error. If |
15 /// [value] has already completed, cancelling the operation does nothing. | 15 /// [value] has already completed, cancelling the operation does nothing. |
16 class CancelableOperation<T> { | 16 class CancelableOperation<T> { |
17 /// The completer that produced this operation. | 17 /// The completer that produced this operation. |
18 /// | 18 /// |
19 /// This is canceled when [cancel] is called. | 19 /// This is canceled when [cancel] is called. |
20 final CancelableCompleter<T> _completer; | 20 final CancelableCompleter<T> _completer; |
21 | 21 |
22 CancelableOperation._(this._completer); | 22 CancelableOperation._(this._completer); |
23 | 23 |
24 /// Creates a [CancelableOperation] wrapping [inner]. | 24 /// Creates a [CancelableOperation] wrapping [inner]. |
25 /// | 25 /// |
26 /// When this operation is canceled, [onCancel] will be called and any value | 26 /// When this operation is canceled, [onCancel] will be called and any value |
27 /// or error produced by [inner] will be discarded. The callback may return a | 27 /// or error produced by [inner] will be discarded. If [onCancel] returns a |
28 /// Future to indicate that asynchronous work has to be done to cancel the | 28 /// [Future], it will be forwarded to [cancel]. |
29 /// future; this Future will be returned by [cancel]. | |
30 /// | 29 /// |
31 /// [onCancel] will be called synchronously when the operation is canceled. | 30 /// [onCancel] will be called synchronously when the operation is canceled. |
32 /// It's guaranteed to only be called once. | 31 /// It's guaranteed to only be called once. |
33 factory CancelableOperation.fromFuture(Future<T> inner, {onCancel()}) { | 32 factory CancelableOperation.fromFuture(Future<T> inner, |
| 33 {FutureOr onCancel()}) { |
34 var completer = new CancelableCompleter<T>(onCancel: onCancel); | 34 var completer = new CancelableCompleter<T>(onCancel: onCancel); |
35 completer.complete(inner); | 35 completer.complete(inner); |
36 return completer.operation; | 36 return completer.operation; |
37 } | 37 } |
38 | 38 |
39 /// The value returned by the operation. | 39 /// The value returned by the operation. |
40 Future<T> get value => _completer._inner.future; | 40 Future<T> get value => _completer._inner.future; |
41 | 41 |
42 /// Creates a [Stream] containing the result of this operation. | 42 /// Creates a [Stream] containing the result of this operation. |
43 /// | 43 /// |
44 /// This is like `value.asStream()`, but if a subscription to the stream is | 44 /// This is like `value.asStream()`, but if a subscription to the stream is |
45 /// canceled, this is as well. | 45 /// canceled, this is as well. |
46 Stream<T> asStream() { | 46 Stream<T> asStream() { |
47 var controller = new StreamController<T>( | 47 var controller = |
48 sync: true, onCancel: _completer._cancel); | 48 new StreamController<T>(sync: true, onCancel: _completer._cancel); |
49 | 49 |
50 value.then((value) { | 50 value.then((value) { |
51 controller.add(value); | 51 controller.add(value); |
52 controller.close(); | 52 controller.close(); |
53 }, onError: (error, stackTrace) { | 53 }, onError: (error, stackTrace) { |
54 controller.addError(error, stackTrace); | 54 controller.addError(error, stackTrace); |
55 controller.close(); | 55 controller.close(); |
56 }); | 56 }); |
57 return controller.stream; | 57 return controller.stream; |
58 } | 58 } |
59 | 59 |
| 60 /// Creates a [Future] that completes when this operation completes *or* when |
| 61 /// it's cancelled. |
| 62 /// |
| 63 /// If this operation completes, this completes to the same result as [value]. |
| 64 /// If this operation is cancelled, the returned future waits for the future |
| 65 /// returned by [cancel], then completes to [cancellationValue]. |
| 66 Future valueOrCancellation([T cancellationValue]) { |
| 67 var completer = new Completer<T>.sync(); |
| 68 value.then((result) => completer.complete(result), |
| 69 onError: completer.completeError); |
| 70 |
| 71 _completer._cancelMemo.future.then((_) { |
| 72 completer.complete(cancellationValue); |
| 73 }, onError: completer.completeError); |
| 74 |
| 75 return completer.future; |
| 76 } |
| 77 |
60 /// Cancels this operation. | 78 /// Cancels this operation. |
61 /// | 79 /// |
62 /// This returns the [Future] returned by the [CancelableCompleter]'s | 80 /// This returns the [Future] returned by the [CancelableCompleter]'s |
63 /// `onCancel` callback. Unlike [Stream.cancel], it never returns `null`. | 81 /// `onCancel` callback. Unlike [Stream.cancel], it never returns `null`. |
64 Future cancel() => _completer._cancel(); | 82 Future cancel() => _completer._cancel(); |
65 } | 83 } |
66 | 84 |
67 /// A completer for a [CancelableOperation]. | 85 /// A completer for a [CancelableOperation]. |
68 class CancelableCompleter<T> { | 86 class CancelableCompleter<T> { |
69 /// The completer for the wrapped future. | 87 /// The completer for the wrapped future. |
70 final Completer<T> _inner; | 88 final Completer<T> _inner; |
71 | 89 |
72 /// The callback to call if the future is canceled. | 90 /// The callback to call if the future is canceled. |
73 final ZoneCallback _onCancel; | 91 final FutureOrCallback _onCancel; |
74 | 92 |
75 /// Creates a new completer for a [CancelableOperation]. | 93 /// Creates a new completer for a [CancelableOperation]. |
76 /// | 94 /// |
77 /// When the future operation canceled, as long as the completer hasn't yet | 95 /// When the future operation canceled, as long as the completer hasn't yet |
78 /// completed, [onCancel] is called. The callback may return a [Future]; if | 96 /// completed, [onCancel] is called. If [onCancel] returns a [Future], it's |
79 /// so, that [Future] is returned by [CancelableOperation.cancel]. | 97 /// forwarded to [CancelableOperation.cancel]. |
80 /// | 98 /// |
81 /// [onCancel] will be called synchronously when the operation is canceled. | 99 /// [onCancel] will be called synchronously when the operation is canceled. |
82 /// It's guaranteed to only be called once. | 100 /// It's guaranteed to only be called once. |
83 CancelableCompleter({onCancel()}) | 101 CancelableCompleter({FutureOr onCancel()}) |
84 : _onCancel = onCancel, | 102 : _onCancel = onCancel, |
85 _inner = new Completer<T>() { | 103 _inner = new Completer<T>() { |
86 _operation = new CancelableOperation<T>._(this); | 104 _operation = new CancelableOperation<T>._(this); |
87 } | 105 } |
88 | 106 |
89 /// The operation controlled by this completer. | 107 /// The operation controlled by this completer. |
90 CancelableOperation<T> get operation => _operation; | 108 CancelableOperation<T> get operation => _operation; |
91 CancelableOperation<T> _operation; | 109 CancelableOperation<T> _operation; |
92 | 110 |
93 /// Whether the completer has completed. | 111 /// Whether the completer has completed. |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
133 /// Completes [operation] to [error]. | 151 /// Completes [operation] to [error]. |
134 void completeError(Object error, [StackTrace stackTrace]) { | 152 void completeError(Object error, [StackTrace stackTrace]) { |
135 if (_isCompleted) throw new StateError("Operation already completed"); | 153 if (_isCompleted) throw new StateError("Operation already completed"); |
136 _isCompleted = true; | 154 _isCompleted = true; |
137 | 155 |
138 if (_isCanceled) return; | 156 if (_isCanceled) return; |
139 _inner.completeError(error, stackTrace); | 157 _inner.completeError(error, stackTrace); |
140 } | 158 } |
141 | 159 |
142 /// Cancel the completer. | 160 /// Cancel the completer. |
143 Future _cancel() => _cancelMemo.runOnce(() { | 161 Future _cancel() { |
144 if (_inner.isCompleted) return null; | 162 if (_inner.isCompleted) return new Future.value(); |
145 _isCanceled = true; | 163 |
146 if (_onCancel != null) return _onCancel(); | 164 return _cancelMemo.runOnce(() { |
147 }); | 165 _isCanceled = true; |
| 166 if (_onCancel != null) return _onCancel(); |
| 167 }); |
| 168 } |
148 } | 169 } |
OLD | NEW |