| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.cancelable_operation; | |
| 6 | |
| 7 import 'dart:async'; | 5 import 'dart:async'; |
| 8 | 6 |
| 9 import 'package:async/async.dart'; | 7 import 'package:async/async.dart'; |
| 10 | 8 |
| 11 /// An asynchronuos operation that can be cancelled. | 9 import 'utils.dart'; |
| 10 |
| 11 /// An asynchronous operation that can be cancelled. |
| 12 /// | 12 /// |
| 13 /// The value of this operation is exposed as [value]. When this operation is | 13 /// The value of this operation is exposed as [value]. When this operation is |
| 14 /// cancelled, [value] won't complete either successfully or with an error. If | 14 /// cancelled, [value] won't complete either successfully or with an error. If |
| 15 /// [value] has already completed, cancelling the operation does nothing. | 15 /// [value] has already completed, cancelling the operation does nothing. |
| 16 class CancelableOperation<T> { | 16 class CancelableOperation<T> { |
| 17 /// The completer that produced this operation. | 17 /// The completer that produced this operation. |
| 18 /// | 18 /// |
| 19 /// This is canceled when [cancel] is called. | 19 /// This is canceled when [cancel] is called. |
| 20 final CancelableCompleter<T> _completer; | 20 final CancelableCompleter<T> _completer; |
| 21 | 21 |
| 22 CancelableOperation._(this._completer); | 22 CancelableOperation._(this._completer); |
| 23 | 23 |
| 24 /// Creates a [CancelableOperation] wrapping [inner]. | 24 /// Creates a [CancelableOperation] wrapping [inner]. |
| 25 /// | 25 /// |
| 26 /// When this operation is canceled, [onCancel] will be called and any value | 26 /// When this operation is canceled, [onCancel] will be called and any value |
| 27 /// or error produced by [inner] will be discarded. The callback may return a | 27 /// or error produced by [inner] will be discarded. If [onCancel] returns a |
| 28 /// Future to indicate that asynchronous work has to be done to cancel the | 28 /// [Future], it will be forwarded to [cancel]. |
| 29 /// future; this Future will be returned by [cancel]. | |
| 30 /// | 29 /// |
| 31 /// [onCancel] will be called synchronously when the operation is canceled. | 30 /// [onCancel] will be called synchronously when the operation is canceled. |
| 32 /// It's guaranteed to only be called once. | 31 /// It's guaranteed to only be called once. |
| 33 factory CancelableOperation.fromFuture(Future<T> inner, {onCancel()}) { | 32 factory CancelableOperation.fromFuture(Future<T> inner, |
| 33 {FutureOr onCancel()}) { |
| 34 var completer = new CancelableCompleter<T>(onCancel: onCancel); | 34 var completer = new CancelableCompleter<T>(onCancel: onCancel); |
| 35 completer.complete(inner); | 35 completer.complete(inner); |
| 36 return completer.operation; | 36 return completer.operation; |
| 37 } | 37 } |
| 38 | 38 |
| 39 /// The value returned by the operation. | 39 /// The value returned by the operation. |
| 40 Future<T> get value => _completer._inner.future; | 40 Future<T> get value => _completer._inner.future; |
| 41 | 41 |
| 42 /// Creates a [Stream] containing the result of this operation. | 42 /// Creates a [Stream] containing the result of this operation. |
| 43 /// | 43 /// |
| 44 /// This is like `value.asStream()`, but if a subscription to the stream is | 44 /// This is like `value.asStream()`, but if a subscription to the stream is |
| 45 /// canceled, this is as well. | 45 /// canceled, this is as well. |
| 46 Stream<T> asStream() { | 46 Stream<T> asStream() { |
| 47 var controller = new StreamController<T>( | 47 var controller = |
| 48 sync: true, onCancel: _completer._cancel); | 48 new StreamController<T>(sync: true, onCancel: _completer._cancel); |
| 49 | 49 |
| 50 value.then((value) { | 50 value.then((value) { |
| 51 controller.add(value); | 51 controller.add(value); |
| 52 controller.close(); | 52 controller.close(); |
| 53 }, onError: (error, stackTrace) { | 53 }, onError: (error, stackTrace) { |
| 54 controller.addError(error, stackTrace); | 54 controller.addError(error, stackTrace); |
| 55 controller.close(); | 55 controller.close(); |
| 56 }); | 56 }); |
| 57 return controller.stream; | 57 return controller.stream; |
| 58 } | 58 } |
| 59 | 59 |
| 60 /// Creates a [Future] that completes when this operation completes *or* when |
| 61 /// it's cancelled. |
| 62 /// |
| 63 /// If this operation completes, this completes to the same result as [value]. |
| 64 /// If this operation is cancelled, the returned future waits for the future |
| 65 /// returned by [cancel], then completes to [cancellationValue]. |
| 66 Future valueOrCancellation([T cancellationValue]) { |
| 67 var completer = new Completer<T>.sync(); |
| 68 value.then((result) => completer.complete(result), |
| 69 onError: completer.completeError); |
| 70 |
| 71 _completer._cancelMemo.future.then((_) { |
| 72 completer.complete(cancellationValue); |
| 73 }, onError: completer.completeError); |
| 74 |
| 75 return completer.future; |
| 76 } |
| 77 |
| 60 /// Cancels this operation. | 78 /// Cancels this operation. |
| 61 /// | 79 /// |
| 62 /// This returns the [Future] returned by the [CancelableCompleter]'s | 80 /// This returns the [Future] returned by the [CancelableCompleter]'s |
| 63 /// `onCancel` callback. Unlike [Stream.cancel], it never returns `null`. | 81 /// `onCancel` callback. Unlike [Stream.cancel], it never returns `null`. |
| 64 Future cancel() => _completer._cancel(); | 82 Future cancel() => _completer._cancel(); |
| 65 } | 83 } |
| 66 | 84 |
| 67 /// A completer for a [CancelableOperation]. | 85 /// A completer for a [CancelableOperation]. |
| 68 class CancelableCompleter<T> { | 86 class CancelableCompleter<T> { |
| 69 /// The completer for the wrapped future. | 87 /// The completer for the wrapped future. |
| 70 final Completer<T> _inner; | 88 final Completer<T> _inner; |
| 71 | 89 |
| 72 /// The callback to call if the future is canceled. | 90 /// The callback to call if the future is canceled. |
| 73 final ZoneCallback _onCancel; | 91 final FutureOrCallback _onCancel; |
| 74 | 92 |
| 75 /// Creates a new completer for a [CancelableOperation]. | 93 /// Creates a new completer for a [CancelableOperation]. |
| 76 /// | 94 /// |
| 77 /// When the future operation canceled, as long as the completer hasn't yet | 95 /// When the future operation canceled, as long as the completer hasn't yet |
| 78 /// completed, [onCancel] is called. The callback may return a [Future]; if | 96 /// completed, [onCancel] is called. If [onCancel] returns a [Future], it's |
| 79 /// so, that [Future] is returned by [CancelableOperation.cancel]. | 97 /// forwarded to [CancelableOperation.cancel]. |
| 80 /// | 98 /// |
| 81 /// [onCancel] will be called synchronously when the operation is canceled. | 99 /// [onCancel] will be called synchronously when the operation is canceled. |
| 82 /// It's guaranteed to only be called once. | 100 /// It's guaranteed to only be called once. |
| 83 CancelableCompleter({onCancel()}) | 101 CancelableCompleter({FutureOr onCancel()}) |
| 84 : _onCancel = onCancel, | 102 : _onCancel = onCancel, |
| 85 _inner = new Completer<T>() { | 103 _inner = new Completer<T>() { |
| 86 _operation = new CancelableOperation<T>._(this); | 104 _operation = new CancelableOperation<T>._(this); |
| 87 } | 105 } |
| 88 | 106 |
| 89 /// The operation controlled by this completer. | 107 /// The operation controlled by this completer. |
| 90 CancelableOperation<T> get operation => _operation; | 108 CancelableOperation<T> get operation => _operation; |
| 91 CancelableOperation<T> _operation; | 109 CancelableOperation<T> _operation; |
| 92 | 110 |
| 93 /// Whether the completer has completed. | 111 /// Whether the completer has completed. |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 133 /// Completes [operation] to [error]. | 151 /// Completes [operation] to [error]. |
| 134 void completeError(Object error, [StackTrace stackTrace]) { | 152 void completeError(Object error, [StackTrace stackTrace]) { |
| 135 if (_isCompleted) throw new StateError("Operation already completed"); | 153 if (_isCompleted) throw new StateError("Operation already completed"); |
| 136 _isCompleted = true; | 154 _isCompleted = true; |
| 137 | 155 |
| 138 if (_isCanceled) return; | 156 if (_isCanceled) return; |
| 139 _inner.completeError(error, stackTrace); | 157 _inner.completeError(error, stackTrace); |
| 140 } | 158 } |
| 141 | 159 |
| 142 /// Cancel the completer. | 160 /// Cancel the completer. |
| 143 Future _cancel() => _cancelMemo.runOnce(() { | 161 Future _cancel() { |
| 144 if (_inner.isCompleted) return null; | 162 if (_inner.isCompleted) return new Future.value(); |
| 145 _isCanceled = true; | 163 |
| 146 if (_onCancel != null) return _onCancel(); | 164 return _cancelMemo.runOnce(() { |
| 147 }); | 165 _isCanceled = true; |
| 166 if (_onCancel != null) return _onCancel(); |
| 167 }); |
| 168 } |
| 148 } | 169 } |
| OLD | NEW |