OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import "dart:_internal"; | 5 import "dart:_internal"; |
6 | 6 |
| 7 // We need to pass the value as first argument and leave the second and third |
| 8 // arguments empty (used for error handling). |
| 9 // See vm/ast_transformer.cc for usage. |
| 10 Function _asyncThenWrapperHelper(continuation) { |
| 11 // Any function that is used as an asynchronous callback must be registered |
| 12 // in the current Zone. Normally, this is done by the future when a |
| 13 // callback is registered (for example with `.then` or `.catchError`). In our |
| 14 // case we want to reuse the same callback multiple times and therefore avoid |
| 15 // the multiple registrations. For our internal futures (`_Future`) we can |
| 16 // use the shortcut-version of `.then`, and skip the registration. However, |
| 17 // that means that the continuation must be registered by us. |
| 18 // |
| 19 // Furthermore, we know that the root-zone doesn't actually do anything and |
| 20 // we can therefore skip the registration call for it. |
| 21 // |
| 22 // Note, that the continuation accepts up to three arguments. If the current |
| 23 // zone is the root zone, we don't wrap the continuation, and a bad |
| 24 // `Future` implementation could potentially invoke the callback with the |
| 25 // wrong number of arguments. |
| 26 if (Zone.current == Zone.ROOT) return continuation; |
| 27 return Zone.current.registerUnaryCallback((x) => continuation(x, null, null)); |
| 28 } |
| 29 |
| 30 // We need to pass the exception and stack trace objects as second and third |
| 31 // parameter to the continuation. See vm/ast_transformer.cc for usage. |
| 32 Function _asyncErrorWrapperHelper(continuation) { |
| 33 // See comments of `_asyncThenWrapperHelper`. |
| 34 var errorCallback = (e, s) => continuation(null, e, s); |
| 35 if (Zone.current == Zone.ROOT) return errorCallback; |
| 36 return Zone.current.registerBinaryCallback(errorCallback); |
| 37 } |
| 38 |
| 39 /// Registers the [thenCallback] and [errorCallback] on the given [object]. |
| 40 /// |
| 41 /// If [object] is not a future, then it is wrapped into one. |
| 42 /// |
| 43 /// Returns the result of registering with `.then`. |
| 44 Future _awaitHelper( |
| 45 var object, Function thenCallback, Function errorCallback) { |
| 46 if (object is! Future) { |
| 47 object = new _Future().._setValue(object); |
| 48 } else if (object is! _Future) { |
| 49 return object.then(thenCallback, onError: errorCallback); |
| 50 } |
| 51 // `object` is a `_Future`. |
| 52 // |
| 53 // Since the callbacks have been registered in the current zone (see |
| 54 // [_asyncThenWrapperHelper] and [_asyncErrorWrapperHelper]), we can avoid |
| 55 // another registration and directly invoke the no-zone-registration `.then`. |
| 56 // |
| 57 // We can only do this for our internal futures (the default implementation of |
| 58 // all futures that are constructed by the `dart:async` library). |
| 59 return object._thenNoZoneRegistration(thenCallback, errorCallback); |
| 60 } |
| 61 |
| 62 // _AsyncStarStreamController is used by the compiler to implement |
| 63 // async* generator functions. |
| 64 class _AsyncStarStreamController { |
| 65 StreamController controller; |
| 66 Function asyncStarBody; |
| 67 bool isAdding = false; |
| 68 bool onListenReceived = false; |
| 69 bool isScheduled = false; |
| 70 bool isSuspendedAtYield = false; |
| 71 Completer cancellationCompleter = null; |
| 72 |
| 73 Stream get stream => controller.stream; |
| 74 |
| 75 void runBody() { |
| 76 isScheduled = false; |
| 77 isSuspendedAtYield = false; |
| 78 asyncStarBody(); |
| 79 } |
| 80 |
| 81 void scheduleGenerator() { |
| 82 if (isScheduled || controller.isPaused || isAdding) { |
| 83 return; |
| 84 } |
| 85 isScheduled = true; |
| 86 scheduleMicrotask(runBody); |
| 87 } |
| 88 |
| 89 // Adds element to steam, returns true if the caller should terminate |
| 90 // execution of the generator. |
| 91 // |
| 92 // TODO(hausner): Per spec, the generator should be suspended before |
| 93 // exiting when the stream is closed. We could add a getter like this: |
| 94 // get isCancelled => controller.hasListener; |
| 95 // The generator would translate a 'yield e' statement to |
| 96 // controller.add(e); |
| 97 // suspend; |
| 98 // if (controller.isCancelled) return; |
| 99 bool add(event) { |
| 100 if (!onListenReceived) _fatal("yield before stream is listened to!"); |
| 101 if (isSuspendedAtYield) _fatal("unexpected yield"); |
| 102 // If stream is cancelled, tell caller to exit the async generator. |
| 103 if (!controller.hasListener) { |
| 104 return true; |
| 105 } |
| 106 controller.add(event); |
| 107 scheduleGenerator(); |
| 108 isSuspendedAtYield = true; |
| 109 return false; |
| 110 } |
| 111 |
| 112 // Adds the elements of stream into this controller's stream. |
| 113 // The generator will be scheduled again when all of the |
| 114 // elements of the added stream have been consumed. |
| 115 // Returns true if the caller should terminate |
| 116 // execution of the generator. |
| 117 bool addStream(Stream stream) { |
| 118 if (!onListenReceived) _fatal("yield before stream is listened to!"); |
| 119 // If stream is cancelled, tell caller to exit the async generator. |
| 120 if (!controller.hasListener) return true; |
| 121 isAdding = true; |
| 122 var whenDoneAdding = |
| 123 controller.addStream(stream as Stream, cancelOnError: false); |
| 124 whenDoneAdding.then((_) { |
| 125 isAdding = false; |
| 126 scheduleGenerator(); |
| 127 }); |
| 128 return false; |
| 129 } |
| 130 |
| 131 void addError(error, stackTrace) { |
| 132 if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { |
| 133 // If the stream has been cancelled, complete the cancellation future |
| 134 // with the error. |
| 135 cancellationCompleter.completeError(error, stackTrace); |
| 136 return; |
| 137 } |
| 138 // If stream is cancelled, tell caller to exit the async generator. |
| 139 if (!controller.hasListener) return; |
| 140 controller.addError(error, stackTrace); |
| 141 // No need to schedule the generator body here. This code is only |
| 142 // called from the catch clause of the implicit try-catch-finally |
| 143 // around the generator body. That is, we are on the error path out |
| 144 // of the generator and do not need to run the generator again. |
| 145 } |
| 146 |
| 147 close() { |
| 148 if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { |
| 149 // If the stream has been cancelled, complete the cancellation future |
| 150 // with the error. |
| 151 cancellationCompleter.complete(); |
| 152 } |
| 153 controller.close(); |
| 154 } |
| 155 |
| 156 _AsyncStarStreamController(this.asyncStarBody) { |
| 157 controller = new StreamController(onListen: this.onListen, |
| 158 onResume: this.onResume, |
| 159 onCancel: this.onCancel); |
| 160 } |
| 161 |
| 162 onListen() { |
| 163 assert(!onListenReceived); |
| 164 onListenReceived = true; |
| 165 scheduleGenerator(); |
| 166 } |
| 167 |
| 168 onResume() { |
| 169 if (isSuspendedAtYield) { |
| 170 scheduleGenerator(); |
| 171 } |
| 172 } |
| 173 |
| 174 onCancel() { |
| 175 if (controller.isClosed) { |
| 176 return null; |
| 177 } |
| 178 if (cancellationCompleter == null) { |
| 179 cancellationCompleter = new Completer(); |
| 180 scheduleGenerator(); |
| 181 } |
| 182 return cancellationCompleter.future; |
| 183 } |
| 184 } |
OLD | NEW |