Chromium Code Reviews| Index: lib/runtime/_generators.js |
| diff --git a/lib/runtime/_generators.js b/lib/runtime/_generators.js |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..5b938fd21212812420e08c42022db2df2a2e5a04 |
| --- /dev/null |
| +++ b/lib/runtime/_generators.js |
| @@ -0,0 +1,261 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +/** |
| + * This library adapts ES6 generators to implement Dart's async/await. |
| + * |
| + * It's designed to interact with Dart's Future/Stream and follow Dart |
| + * async/await semantics. |
| + * |
| + * See https://github.com/dart-lang/dev_compiler/issues/245 for ideas on |
| + * reconciling Dart's Future and ES6 Promise. |
| + * |
| + * Inspired by `co`: https://github.com/tj/co/blob/master/index.js, which is a |
| + * stepping stone for proposed ES7 async/await, and uses ES6 Promises. |
| + */ |
| +dart_library.library('dart_runtime/_generators', null, /* Imports */[ |
| +], /* Lazy Imports */[ |
| + 'dart_runtime/_operations', |
| + 'dart_runtime/_classes', |
| + 'dart/core', |
| + 'dart/collection', |
| + 'dart/async' |
| +], function(exports, _operations, _classes, core, collection, async) { |
| + 'use strict'; |
| + |
| + const _jsIterator = Symbol('_jsIterator'); |
| + const _current = Symbol('_current'); |
| + |
| + // TODO(jmesserly): this adapter is to work around: |
| + // https://github.com/dart-lang/dev_compiler/issues/247 |
| + class _SyncIterator { |
|
vsm
2015/07/27 21:03:24
Should this "implement" Dart's Iterator and/or sub
Jennifer Messerly
2015/07/27 21:46:35
Yeah, I think so, for soundness. It's hard to repl
|
| + constructor(jsIterator) { |
| + this[_jsIterator] = jsIterator; |
| + this[_current] = null; |
| + } |
| + moveNext() { |
| + var ret = this[_jsIterator].next(); |
| + this[_current] = ret.value; |
| + return !ret.done; |
| + } |
| + get current() { |
| + return this[_current]; |
| + } |
| + } |
| + |
| + function syncStar(gen, E, ...args) { |
| + let IterableBase_E = collection.IterableBase$(E); |
| + class _SyncIterable extends IterableBase_E { |
| + get iterator() { |
| + return new _SyncIterator(gen(...args)); |
| + } |
| + [Symbol.iterator]() { |
| + return gen(...args); |
| + } |
| + } |
| + _classes.setSignature(_SyncIterable, { methods: () => ({}) }); |
| + _classes.defineExtensionMembers(_SyncIterable, ['iterator']); |
| + return new _SyncIterable(); |
| + } |
| + exports.syncStar = syncStar; |
| + |
| + function async_(gen, T, ...args) { |
| + let iter; |
| + function onValue(res) { |
| + if (res === void 0) res = null; |
| + return next(iter.next(res)); |
| + } |
| + function onError(err) { |
| + // If the awaited Future throws, we want to convert this to an exception |
| + // thrown from the `yield` point, as if it was thrown there. |
| + // |
| + // If the exception is not caught inside `gen`, it will emerge here, which |
| + // will send it to anyone listening on this async function's Future<T>. |
| + // |
| + // In essence, we are giving the code inside the generator a chance to |
| + // use try-catch-finally. |
| + return next(iter.throw(err)); |
| + } |
| + function next(ret) { |
| + if (ret.done) return ret.value; |
| + // Checks if the awaited value is a Future. |
| + let future = ret.value; |
| + if (!_operations.instanceOf(future, async.Future$)) { |
| + future = async.Future.value(future); |
| + } |
| + // Chain the Future so `await` receives the Future's value. |
| + return future.then(onValue, {onError: onError}); |
| + } |
| + return async.Future$(T).new(function() { |
| + iter = gen(...args)[Symbol.iterator](); |
| + return onValue(); |
| + }); |
| + } |
| + exports.async = async_; |
| + |
| + // Implementation inspired by _AsyncStarStreamController in |
| + // dart-lang/sdk's runtime/lib/core_patch.dart |
| + // |
| + // Given input like: |
| + // |
| + // foo() async* { |
| + // yield 1; |
| + // yield* bar(); |
| + // print(await baz()); |
| + // } |
| + // |
| + // This generates as: |
| + // |
| + // function foo() { |
| + // return dart.asyncStar(function*(stream) { |
| + // if (stream.add(1)) return; |
| + // yield; |
| + // if (stream.addStream(bar()) return; |
| + // yield; |
| + // print(yield baz()); |
| + // }); |
| + // } |
| + class _AsyncStarStreamController { |
| + constructor(generator, T, args) { |
| + this.isAdding = false; |
| + this.isWaiting = false; |
| + this.isScheduled = false; |
| + this.isSuspendedAtYield = false; |
| + this.canceler = null; |
| + this.iterator = generator(this, ...args)[Symbol.iterator](); |
| + this.controller = async.StreamController$(T).new({ |
| + onListen: () => this.scheduleGenerator(), |
| + onResume: () => this.onResume(), |
| + onCancel: () => this.onCancel() |
| + }); |
| + } |
| + |
| + onResume() { |
| + if (this.isSuspendedAtYield) { |
| + this.scheduleGenerator(); |
| + } |
| + } |
| + |
| + onCancel() { |
| + if (this.controller.isClosed) { |
| + return null; |
| + } |
| + if (this.canceler == null) { |
| + this.canceler = async.Completer.new(); |
| + this.scheduleGenerator(); |
| + } |
| + return this.canceler.future; |
| + } |
| + |
| + close() { |
| + if (this.canceler != null && !this.canceler.isCompleted) { |
| + // If the stream has been cancelled, complete the cancellation future |
| + // with the error. |
| + this.canceler.complete(); |
| + } |
| + this.controller.close(); |
| + } |
| + |
| + scheduleGenerator() { |
| + // TODO(jmesserly): is this paused check in the right place? Assuming the |
| + // async* Stream yields, then is paused (by other code), the body will |
| + // already be scheduled. This will cause at least one more iteration to |
| + // run (adding another data item to the Stream) before actually pausing. |
| + // It could be fixed by moving the `isPaused` check inside `runBody`. |
| + if (this.isScheduled || this.controller.isPaused || |
| + this.isAdding || this.isWaiting) { |
| + return; |
| + } |
| + this.isScheduled = true; |
| + async.scheduleMicrotask(() => this.runBody()); |
| + } |
| + |
| + runBody(opt_awaitValue) { |
| + this.isScheduled = false; |
| + this.isSuspendedAtYield = false; |
| + this.isWaiting = false; |
| + let iter; |
| + try { |
| + iter = this.iterator.next(opt_awaitValue); |
| + } catch (e) { |
| + this.addError(e, _operations.stackTrace(e)); |
| + this.close(); |
| + return; |
| + } |
| + if (iter.done) { |
| + this.close(); |
| + return; |
| + } |
| + |
| + // If we're suspended at a yield/yield*, we're done for now. |
| + if (this.isSuspendedAtYield || this.isAdding) return; |
| + |
| + // Handle `await`: if we get a value passed to `yield` it means we are |
| + // waiting on this Future. Make sure to prevent scheduling, and pass the |
| + // value back as the result of the `yield`. |
| + // |
| + // TODO(jmesserly): is the timing here correct? The assumption here is |
| + // that we should schedule `await` in `async*` the same as in `async`. |
| + this.isWaiting = true; |
| + let future = iter.value; |
| + if (!_operations.instanceOf(future, async.Future$)) { |
| + future = async.Future.value(future); |
| + } |
| + return future.then((x) => this.runBody(x), |
| + { onError: (e, s) => this.throwError(e, s) }); |
| + } |
| + |
| + // Adds element to steam, returns true if the caller should terminate |
|
vsm
2015/07/27 21:03:24
steam -> stream
Jennifer Messerly
2015/07/27 21:46:35
Stand back, I'm doing chemistry!
In all seriousne
|
| + // execution of the generator. |
| + add(event) { |
| + // If stream is cancelled, tell caller to exit the async generator. |
| + if (!this.controller.hasListener) return true; |
| + this.controller.add(event); |
| + this.scheduleGenerator(); |
| + this.isSuspendedAtYield = true; |
| + return false; |
| + } |
| + |
| + // Adds the elements of stream into this controller's stream. |
| + // The generator will be scheduled again when all of the |
| + // elements of the added stream have been consumed. |
| + // Returns true if the caller should terminate |
| + // execution of the generator. |
| + addStream(stream) { |
| + // If stream is cancelled, tell caller to exit the async generator. |
| + if (!this.controller.hasListener) return true; |
| + |
| + this.isAdding = true; |
| + this.controller.addStream(stream, {cancelOnError: false}).then(() => { |
| + this.isAdding = false; |
| + this.scheduleGenerator(); |
| + }, { onError: (e, s) => this.throwError(e, s) }); |
| + } |
| + |
| + throwError(error, stackTrace) { |
| + try { |
| + this.iterator.throw(error); |
| + } catch (e) { |
| + this.addError(e, stackTrace); |
| + } |
| + } |
| + |
| + addError(error, stackTrace) { |
| + if ((this.canceler != null) && !this.canceler.isCompleted) { |
| + // If the stream has been cancelled, complete the cancellation future |
| + // with the error. |
| + this.canceler.completeError(error, stackTrace); |
| + return; |
| + } |
| + if (!this.controller.hasListener) return; |
| + this.controller.addError(error, stackTrace); |
| + } |
| + } |
| + |
| + /** Returns a Stream of T implemented by an async* function. */ |
| + function asyncStar(gen, T, ...args) { |
| + return new _AsyncStarStreamController(gen, T, args).controller.stream; |
| + } |
| + exports.asyncStar = asyncStar; |
| +}); |