Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 /** | |
| 6 * This library adapts ES6 generators to implement Dart's async/await. | |
| 7 * | |
| 8 * It's designed to interact with Dart's Future/Stream and follow Dart | |
| 9 * async/await semantics. | |
| 10 * | |
| 11 * See https://github.com/dart-lang/dev_compiler/issues/245 for ideas on | |
| 12 * reconciling Dart's Future and ES6 Promise. | |
| 13 * | |
| 14 * Inspired by `co`: https://github.com/tj/co/blob/master/index.js, which is a | |
| 15 * stepping stone for proposed ES7 async/await, and uses ES6 Promises. | |
| 16 */ | |
| 17 dart_library.library('dart_runtime/_generators', null, /* Imports */[ | |
| 18 ], /* Lazy Imports */[ | |
| 19 'dart_runtime/_operations', | |
| 20 'dart_runtime/_classes', | |
| 21 'dart/core', | |
| 22 'dart/collection', | |
| 23 'dart/async' | |
| 24 ], function(exports, _operations, _classes, core, collection, async) { | |
| 25 'use strict'; | |
| 26 | |
| 27 const _jsIterator = Symbol('_jsIterator'); | |
| 28 const _current = Symbol('_current'); | |
| 29 | |
| 30 // TODO(jmesserly): this adapter is to work around: | |
| 31 // https://github.com/dart-lang/dev_compiler/issues/247 | |
| 32 class _SyncIterator { | |
|
vsm
2015/07/27 21:03:24
Should this "implement" Dart's Iterator and/or sub
Jennifer Messerly
2015/07/27 21:46:35
Yeah, I think so, for soundness. It's hard to repl
| |
| 33 constructor(jsIterator) { | |
| 34 this[_jsIterator] = jsIterator; | |
| 35 this[_current] = null; | |
| 36 } | |
| 37 moveNext() { | |
| 38 var ret = this[_jsIterator].next(); | |
| 39 this[_current] = ret.value; | |
| 40 return !ret.done; | |
| 41 } | |
| 42 get current() { | |
| 43 return this[_current]; | |
| 44 } | |
| 45 } | |
| 46 | |
| 47 function syncStar(gen, E, ...args) { | |
| 48 let IterableBase_E = collection.IterableBase$(E); | |
| 49 class _SyncIterable extends IterableBase_E { | |
| 50 get iterator() { | |
| 51 return new _SyncIterator(gen(...args)); | |
| 52 } | |
| 53 [Symbol.iterator]() { | |
| 54 return gen(...args); | |
| 55 } | |
| 56 } | |
| 57 _classes.setSignature(_SyncIterable, { methods: () => ({}) }); | |
| 58 _classes.defineExtensionMembers(_SyncIterable, ['iterator']); | |
| 59 return new _SyncIterable(); | |
| 60 } | |
| 61 exports.syncStar = syncStar; | |
| 62 | |
| 63 function async_(gen, T, ...args) { | |
| 64 let iter; | |
| 65 function onValue(res) { | |
| 66 if (res === void 0) res = null; | |
| 67 return next(iter.next(res)); | |
| 68 } | |
| 69 function onError(err) { | |
| 70 // If the awaited Future throws, we want to convert this to an exception | |
| 71 // thrown from the `yield` point, as if it was thrown there. | |
| 72 // | |
| 73 // If the exception is not caught inside `gen`, it will emerge here, which | |
| 74 // will send it to anyone listening on this async function's Future<T>. | |
| 75 // | |
| 76 // In essence, we are giving the code inside the generator a chance to | |
| 77 // use try-catch-finally. | |
| 78 return next(iter.throw(err)); | |
| 79 } | |
| 80 function next(ret) { | |
| 81 if (ret.done) return ret.value; | |
| 82 // Checks if the awaited value is a Future. | |
| 83 let future = ret.value; | |
| 84 if (!_operations.instanceOf(future, async.Future$)) { | |
| 85 future = async.Future.value(future); | |
| 86 } | |
| 87 // Chain the Future so `await` receives the Future's value. | |
| 88 return future.then(onValue, {onError: onError}); | |
| 89 } | |
| 90 return async.Future$(T).new(function() { | |
| 91 iter = gen(...args)[Symbol.iterator](); | |
| 92 return onValue(); | |
| 93 }); | |
| 94 } | |
| 95 exports.async = async_; | |
| 96 | |
| 97 // Implementation inspired by _AsyncStarStreamController in | |
| 98 // dart-lang/sdk's runtime/lib/core_patch.dart | |
| 99 // | |
| 100 // Given input like: | |
| 101 // | |
| 102 // foo() async* { | |
| 103 // yield 1; | |
| 104 // yield* bar(); | |
| 105 // print(await baz()); | |
| 106 // } | |
| 107 // | |
| 108 // This generates as: | |
| 109 // | |
| 110 // function foo() { | |
| 111 // return dart.asyncStar(function*(stream) { | |
| 112 // if (stream.add(1)) return; | |
| 113 // yield; | |
| 114 // if (stream.addStream(bar()) return; | |
| 115 // yield; | |
| 116 // print(yield baz()); | |
| 117 // }); | |
| 118 // } | |
| 119 class _AsyncStarStreamController { | |
| 120 constructor(generator, T, args) { | |
| 121 this.isAdding = false; | |
| 122 this.isWaiting = false; | |
| 123 this.isScheduled = false; | |
| 124 this.isSuspendedAtYield = false; | |
| 125 this.canceler = null; | |
| 126 this.iterator = generator(this, ...args)[Symbol.iterator](); | |
| 127 this.controller = async.StreamController$(T).new({ | |
| 128 onListen: () => this.scheduleGenerator(), | |
| 129 onResume: () => this.onResume(), | |
| 130 onCancel: () => this.onCancel() | |
| 131 }); | |
| 132 } | |
| 133 | |
| 134 onResume() { | |
| 135 if (this.isSuspendedAtYield) { | |
| 136 this.scheduleGenerator(); | |
| 137 } | |
| 138 } | |
| 139 | |
| 140 onCancel() { | |
| 141 if (this.controller.isClosed) { | |
| 142 return null; | |
| 143 } | |
| 144 if (this.canceler == null) { | |
| 145 this.canceler = async.Completer.new(); | |
| 146 this.scheduleGenerator(); | |
| 147 } | |
| 148 return this.canceler.future; | |
| 149 } | |
| 150 | |
| 151 close() { | |
| 152 if (this.canceler != null && !this.canceler.isCompleted) { | |
| 153 // If the stream has been cancelled, complete the cancellation future | |
| 154 // with the error. | |
| 155 this.canceler.complete(); | |
| 156 } | |
| 157 this.controller.close(); | |
| 158 } | |
| 159 | |
| 160 scheduleGenerator() { | |
| 161 // TODO(jmesserly): is this paused check in the right place? Assuming the | |
| 162 // async* Stream yields, then is paused (by other code), the body will | |
| 163 // already be scheduled. This will cause at least one more iteration to | |
| 164 // run (adding another data item to the Stream) before actually pausing. | |
| 165 // It could be fixed by moving the `isPaused` check inside `runBody`. | |
| 166 if (this.isScheduled || this.controller.isPaused || | |
| 167 this.isAdding || this.isWaiting) { | |
| 168 return; | |
| 169 } | |
| 170 this.isScheduled = true; | |
| 171 async.scheduleMicrotask(() => this.runBody()); | |
| 172 } | |
| 173 | |
| 174 runBody(opt_awaitValue) { | |
| 175 this.isScheduled = false; | |
| 176 this.isSuspendedAtYield = false; | |
| 177 this.isWaiting = false; | |
| 178 let iter; | |
| 179 try { | |
| 180 iter = this.iterator.next(opt_awaitValue); | |
| 181 } catch (e) { | |
| 182 this.addError(e, _operations.stackTrace(e)); | |
| 183 this.close(); | |
| 184 return; | |
| 185 } | |
| 186 if (iter.done) { | |
| 187 this.close(); | |
| 188 return; | |
| 189 } | |
| 190 | |
| 191 // If we're suspended at a yield/yield*, we're done for now. | |
| 192 if (this.isSuspendedAtYield || this.isAdding) return; | |
| 193 | |
| 194 // Handle `await`: if we get a value passed to `yield` it means we are | |
| 195 // waiting on this Future. Make sure to prevent scheduling, and pass the | |
| 196 // value back as the result of the `yield`. | |
| 197 // | |
| 198 // TODO(jmesserly): is the timing here correct? The assumption here is | |
| 199 // that we should schedule `await` in `async*` the same as in `async`. | |
| 200 this.isWaiting = true; | |
| 201 let future = iter.value; | |
| 202 if (!_operations.instanceOf(future, async.Future$)) { | |
| 203 future = async.Future.value(future); | |
| 204 } | |
| 205 return future.then((x) => this.runBody(x), | |
| 206 { onError: (e, s) => this.throwError(e, s) }); | |
| 207 } | |
| 208 | |
| 209 // Adds element to steam, returns true if the caller should terminate | |
|
vsm
2015/07/27 21:03:24
steam -> stream
Jennifer Messerly
2015/07/27 21:46:35
Stand back, I'm doing chemistry!
In all seriousne
| |
| 210 // execution of the generator. | |
| 211 add(event) { | |
| 212 // If stream is cancelled, tell caller to exit the async generator. | |
| 213 if (!this.controller.hasListener) return true; | |
| 214 this.controller.add(event); | |
| 215 this.scheduleGenerator(); | |
| 216 this.isSuspendedAtYield = true; | |
| 217 return false; | |
| 218 } | |
| 219 | |
| 220 // Adds the elements of stream into this controller's stream. | |
| 221 // The generator will be scheduled again when all of the | |
| 222 // elements of the added stream have been consumed. | |
| 223 // Returns true if the caller should terminate | |
| 224 // execution of the generator. | |
| 225 addStream(stream) { | |
| 226 // If stream is cancelled, tell caller to exit the async generator. | |
| 227 if (!this.controller.hasListener) return true; | |
| 228 | |
| 229 this.isAdding = true; | |
| 230 this.controller.addStream(stream, {cancelOnError: false}).then(() => { | |
| 231 this.isAdding = false; | |
| 232 this.scheduleGenerator(); | |
| 233 }, { onError: (e, s) => this.throwError(e, s) }); | |
| 234 } | |
| 235 | |
| 236 throwError(error, stackTrace) { | |
| 237 try { | |
| 238 this.iterator.throw(error); | |
| 239 } catch (e) { | |
| 240 this.addError(e, stackTrace); | |
| 241 } | |
| 242 } | |
| 243 | |
| 244 addError(error, stackTrace) { | |
| 245 if ((this.canceler != null) && !this.canceler.isCompleted) { | |
| 246 // If the stream has been cancelled, complete the cancellation future | |
| 247 // with the error. | |
| 248 this.canceler.completeError(error, stackTrace); | |
| 249 return; | |
| 250 } | |
| 251 if (!this.controller.hasListener) return; | |
| 252 this.controller.addError(error, stackTrace); | |
| 253 } | |
| 254 } | |
| 255 | |
| 256 /** Returns a Stream of T implemented by an async* function. */ | |
| 257 function asyncStar(gen, T, ...args) { | |
| 258 return new _AsyncStarStreamController(gen, T, args).controller.stream; | |
| 259 } | |
| 260 exports.asyncStar = asyncStar; | |
| 261 }); | |
| OLD | NEW |