| Index: sdk/lib/_internal/lib/isolate_helper.dart
|
| diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
|
| index 05208648a9a3f0e46e579ed2cf703337350684ee..cd3ee4ad56d5108a7bea28b98c0d61e420773756 100644
|
| --- a/sdk/lib/_internal/lib/isolate_helper.dart
|
| +++ b/sdk/lib/_internal/lib/isolate_helper.dart
|
| @@ -22,8 +22,6 @@ import 'dart:_foreign_helper' show DART_CLOSURE_TO_JS,
|
| IsolateContext;
|
| import 'dart:_interceptors' show JSExtendableArray;
|
|
|
| -ReceivePort controlPort;
|
| -
|
| /**
|
| * Called by the compiler to support switching
|
| * between isolates when we get a callback from the DOM.
|
| @@ -236,6 +234,39 @@ class _IsolateContext implements IsolateContext {
|
| // native object containing all globals of an isolate.
|
| final isolateStatics = JS_CREATE_ISOLATE();
|
|
|
| + final RawReceivePortImpl controlPort = new RawReceivePortImpl._controlPort();
|
| +
|
| + final Capability pauseCapability = new Capability();
|
| +
|
| + // TODO(lrn): Store these in single "pausestate" object, so they don't take
|
| + // up as much room when not pausing.
|
| + bool isPaused = false;
|
| + List<_IsolateEvent> delayedEvents = [];
|
| + Set<Capability> pauseTokens = new Set();
|
| +
|
| + _IsolateContext() {
|
| + this.registerWeak(controlPort._id, controlPort);
|
| + }
|
| +
|
| + void addPause(Capability authentification, Capability resume) {
|
| + if (pauseCapability != authentification) return;
|
| + if (pauseTokens.add(resume) && !isPaused) {
|
| + isPaused = true;
|
| + }
|
| + }
|
| +
|
| + void removePause(Capability resume) {
|
| + if (!isPaused) return;
|
| + pauseTokens.remove(resume);
|
| + if (pauseTokens.isEmpty) {
|
| + while(delayedEvents.isNotEmpty) {
|
| + _IsolateEvent event = delayedEvents.removeLast();
|
| + _globalState.topEventLoop.prequeue(event);
|
| + }
|
| + isPaused = false;
|
| + }
|
| + }
|
| +
|
| /**
|
| * Run [code] in the context of the isolate represented by [this].
|
| */
|
| @@ -257,15 +288,32 @@ class _IsolateContext implements IsolateContext {
|
| JS_SET_CURRENT_ISOLATE(isolateStatics);
|
| }
|
|
|
| + void handleControlMessage(message) {
|
| + switch (message[0]) {
|
| + case "pause":
|
| + addPause(message[1], message[2]);
|
| + break;
|
| + case "resume":
|
| + removePause(message[1]);
|
| + break;
|
| + default:
|
| + print("UNKOWN MESSAGE: $message");
|
| + }
|
| + }
|
| +
|
| /** Looks up a port registered for this isolate. */
|
| RawReceivePortImpl lookup(int portId) => ports[portId];
|
|
|
| - /** Registers a port on this isolate. */
|
| - void register(int portId, RawReceivePortImpl port) {
|
| + void _addRegistration(int portId, RawReceivePortImpl port) {
|
| if (ports.containsKey(portId)) {
|
| throw new Exception("Registry: ports must be registered only once.");
|
| }
|
| ports[portId] = port;
|
| + }
|
| +
|
| + /** Registers a port on this isolate. */
|
| + void register(int portId, RawReceivePortImpl port) {
|
| + _addRegistration(portId, port);
|
| _updateGlobalState();
|
| }
|
|
|
| @@ -276,8 +324,7 @@ class _IsolateContext implements IsolateContext {
|
| */
|
| void registerWeak(int portId, RawReceivePortImpl port) {
|
| weakPorts.add(portId);
|
| - // 'register' updates the global state.
|
| - register(portId, port);
|
| + _addRegistration(portId, port);
|
| }
|
|
|
| _updateGlobalState() {
|
| @@ -291,6 +338,7 @@ class _IsolateContext implements IsolateContext {
|
| /** Unregister a port on this isolate. */
|
| void unregister(int portId) {
|
| ports.remove(portId);
|
| + weakPorts.remove(portId);
|
| _updateGlobalState();
|
| }
|
| }
|
| @@ -306,6 +354,10 @@ class _EventLoop {
|
| events.addLast(new _IsolateEvent(isolate, fn, msg));
|
| }
|
|
|
| + void prequeue(_IsolateEvent event) {
|
| + events.addFirst(event);
|
| + }
|
| +
|
| _IsolateEvent dequeue() {
|
| if (events.isEmpty) return null;
|
| return events.removeFirst();
|
| @@ -383,6 +435,10 @@ class _IsolateEvent {
|
| _IsolateEvent(this.isolate, this.fn, this.message);
|
|
|
| void process() {
|
| + if (isolate.isPaused) {
|
| + isolate.delayedEvents.add(this);
|
| + return;
|
| + }
|
| isolate.eval(fn);
|
| }
|
| }
|
| @@ -582,7 +638,7 @@ class IsolateNatives {
|
| return JS("", "new #()", ctor);
|
| }
|
|
|
| - static Future<SendPort> spawnFunction(void topLevelFunction(message),
|
| + static Future<List> spawnFunction(void topLevelFunction(message),
|
| message) {
|
| final name = _getJSFunctionName(topLevelFunction);
|
| if (name == null) {
|
| @@ -592,25 +648,25 @@ class IsolateNatives {
|
| return spawn(name, null, null, message, false, false);
|
| }
|
|
|
| - static Future<SendPort> spawnUri(Uri uri, List<String> args, message) {
|
| + static Future<List> spawnUri(Uri uri, List<String> args, message) {
|
| return spawn(null, uri.toString(), args, message, false, true);
|
| }
|
|
|
| // TODO(sigmund): clean up above, after we make the new API the default:
|
|
|
| /// If [uri] is `null` it is replaced with the current script.
|
| - static Future<SendPort> spawn(String functionName, String uri,
|
| - List<String> args, message,
|
| - bool isLight, bool isSpawnUri) {
|
| + static Future<List> spawn(String functionName, String uri,
|
| + List<String> args, message,
|
| + bool isLight, bool isSpawnUri) {
|
| // Assume that the compiled version of the Dart file lives just next to the
|
| // dart file.
|
| // TODO(floitsch): support precompiled version of dart2js output.
|
| if (uri != null && uri.endsWith(".dart")) uri += ".js";
|
|
|
| ReceivePort port = new ReceivePort();
|
| - Future<SendPort> result = port.first.then((msg) {
|
| + Future<List> result = port.first.then((msg) {
|
| assert(msg[0] == _SPAWNED_SIGNAL);
|
| - return msg[1];
|
| + return msg;
|
| });
|
|
|
| SendPort signalReply = port.sendPort;
|
| @@ -666,8 +722,9 @@ class IsolateNatives {
|
| _IsolateContext context = JS_CURRENT_ISOLATE_CONTEXT();
|
| Primitives.initializeStatics(context.id);
|
| // The isolate's port does not keep the isolate open.
|
| - controlPort = new ReceivePortImpl.weak();
|
| - replyTo.send([_SPAWNED_SIGNAL, controlPort.sendPort]);
|
| + replyTo.send([_SPAWNED_SIGNAL,
|
| + context.controlPort.sendPort,
|
| + context.pauseCapability]);
|
| if (!isSpawnUri) {
|
| topLevel(message);
|
| } else if (topLevel is _MainFunctionArgsMessage) {
|
| @@ -763,6 +820,10 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
| if (shouldSerialize) {
|
| msg = _serializeMessage(msg);
|
| }
|
| + if (isolate.controlPort == _receivePort) {
|
| + isolate.handleControlMessage(msg);
|
| + return;
|
| + }
|
| _globalState.topEventLoop.enqueue(isolate, () {
|
| if (!_receivePort._isClosed) {
|
| if (shouldSerialize) {
|
| @@ -823,18 +884,23 @@ class _WorkerSendPort extends _BaseSendPort implements SendPort {
|
| class RawReceivePortImpl implements RawReceivePort {
|
| static int _nextFreeId = 1;
|
|
|
| - final int _id = _nextFreeId++;
|
| + final int _id;
|
| Function _handler;
|
| bool _isClosed = false;
|
|
|
| - RawReceivePortImpl(this._handler) {
|
| + RawReceivePortImpl(this._handler) : _id = _nextFreeId++ {
|
| _globalState.currentContext.register(_id, this);
|
| }
|
|
|
| - RawReceivePortImpl.weak(this._handler) {
|
| + RawReceivePortImpl.weak(this._handler) : _id = _nextFreeId++ {
|
| _globalState.currentContext.registerWeak(_id, this);
|
| }
|
|
|
| + // Creates the control port of an isolate.
|
| + // This is created before the isolate context object itself,
|
| + // so it cannot access the static _nextFreeId field.
|
| + RawReceivePortImpl._controlPort() : _handler = null, _id = 0;
|
| +
|
| void set handler(Function newHandler) {
|
| _handler = newHandler;
|
| }
|
|
|