Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(700)

Unified Diff: runtime/lib/isolate_patch.dart

Issue 27215002: Very simple version of Isolates. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/lib/isolate.cc ('k') | runtime/tests/vm/dart/isolate_mirror_local_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/lib/isolate_patch.dart
diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart
index a2713d10c604716cf53d1aebbe1a7f90248a4519..867a031e020ffd31fbad37b9617d02bbb93ba05f 100644
--- a/runtime/lib/isolate_patch.dart
+++ b/runtime/lib/isolate_patch.dart
@@ -2,125 +2,109 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-class _CloseToken {
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
- /// close themselves.
- const _CloseToken();
-}
+patch class ReceivePort {
+ /* patch */ factory ReceivePort() = _ReceivePortImpl;
-patch bool _isCloseToken(var object) {
- // TODO(floitsch): can we compare against const _CloseToken()?
- return object is _CloseToken;
+ /* patch */ factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) =
+ _ReceivePortImpl.fromRawReceivePort;
}
-patch class MessageBox {
- /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort());
- MessageBox._oneShot(ReceivePort receivePort)
- : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
- sink = new _IsolateSink._fromPort(receivePort.toSendPort());
-
- /* patch */ MessageBox() : this._(new ReceivePort());
- MessageBox._(ReceivePort receivePort)
- : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
- sink = new _IsolateSink._fromPort(receivePort.toSendPort());
+patch class RawReceivePort {
+ /**
+ * Opens a long-lived port for receiving messages.
+ *
+ * A [RawReceivePort] is low level and does not work with [Zone]s. It
+ * can not be paused. The data-handler must be set before the first
+ * event is received.
+ */
+ /* patch */ factory RawReceivePort([void handler(event)]) {
+ _RawReceivePortImpl result = new _RawReceivePortImpl();
+ result.handler = handler;
+ return result;
+ }
}
-class _IsolateSink implements IsolateSink {
- bool _isClosed = false;
- final SendPort _port;
- _IsolateSink._fromPort(this._port);
+class _ReceivePortImpl extends Stream implements ReceivePort {
+ _ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort());
- void add(dynamic message) {
- _port.send(message);
+ _ReceivePortImpl.fromRawReceivePort(this._rawPort) {
+ _controller = new StreamController(onCancel: close, sync: true);
+ _rawPort.handler = _controller.add;
}
- void addError(Object errorEvent) {
- throw new UnimplementedError("addError on isolate streams");
+ SendPort get sendPort {
+ return _rawPort.sendPort;
}
- void close() {
- if (_isClosed) return;
- add(const _CloseToken());
- _isClosed = true;
+ StreamSubscription listen(void onData(var message),
+ { Function onError,
+ void onDone(),
+ bool cancelOnError }) {
+ return _controller.stream.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ cancelOnError: cancelOnError);
}
- bool operator==(var other) {
- return other is IsolateSink && _port == other._port;
+ close() {
+ _rawPort.close();
+ _controller.close();
}
- int get hashCode => _port.hashCode + 499;
+ final RawReceivePort _rawPort;
+ StreamController _controller;
}
-patch IsolateSink streamSpawnFunction(
- void topLevelFunction(),
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) {
- SendPort sendPort = spawnFunction(topLevelFunction,
- unhandledExceptionCallback);
- return new _IsolateSink._fromPort(sendPort);
-}
-
-patch class ReceivePort {
- /* patch */ factory ReceivePort() {
- return new _ReceivePortImpl();
- }
-}
-
-class _ReceivePortImpl implements ReceivePort {
- factory _ReceivePortImpl() native "ReceivePortImpl_factory";
-
- receive(void onMessage(var message, SendPort replyTo)) {
- _onMessage = onMessage;
- }
+class _RawReceivePortImpl implements RawReceivePort {
+ factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";
close() {
_portMap.remove(_id);
_closeInternal(_id);
}
- SendPort toSendPort() {
+ SendPort get sendPort {
return new _SendPortImpl(_id);
}
/**** Internal implementation details ****/
- // Called from the VM to create a new ReceivePort instance.
- static _ReceivePortImpl _get_or_create(int id) {
- if (_portMap != null) {
- _ReceivePortImpl port = _portMap[id];
- if (port != null) {
- return port;
- }
+ // Called from the VM to create a new RawReceivePort instance.
+ static _RawReceivePortImpl _get_or_create(int id) {
+ _RawReceivePortImpl port = _portMap[id];
+ if (port != null) {
+ return port;
}
- return new _ReceivePortImpl._internal(id);
+ return new _RawReceivePortImpl._internal(id);
}
- _ReceivePortImpl._internal(int id) : _id = id {
- if (_portMap == null) {
- _portMap = new Map();
- }
+ _RawReceivePortImpl._internal(int id) : _id = id {
_portMap[id] = this;
}
- // Called from the VM to retrieve the ReceivePort for a message.
- static _ReceivePortImpl _lookupReceivePort(int id) {
- assert(_portMap != null);
+ // Called from the VM to retrieve the RawReceivePort for a message.
+ static _RawReceivePortImpl _lookupReceivePort(int id) {
return _portMap[id];
}
// Called from the VM to dispatch to the handler.
- static void _handleMessage(_ReceivePortImpl port, int replyId, var message) {
+ static void _handleMessage(
+ _RawReceivePortImpl port, int replyId, var message) {
assert(port != null);
- SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId);
- (port._onMessage)(message, replyTo);
+ port._handler(message);
}
// Call into the VM to close the VM maintained mappings.
- static _closeInternal(int id) native "ReceivePortImpl_closeInternal";
+ static _closeInternal(int id) native "RawReceivePortImpl_closeInternal";
+
+ void set handler(Function newHandler) {
+ this._handler = newHandler;
+ }
final int _id;
- var _onMessage;
+ Function _handler;
- // id to ReceivePort mapping.
- static Map _portMap;
+ // id to RawReceivePort mapping.
+ static final Map _portMap = new HashMap();
}
@@ -135,27 +119,19 @@ class _SendPortImpl implements SendPort {
_sendInternal(_id, replyId, message);
}
- Future call(var message) {
- final completer = new Completer.sync();
- final port = new _ReceivePortImpl();
- send(message, port.toSendPort());
- port.receive((value, ignoreReplyTo) {
- port.close();
- if (value is Exception) {
- completer.completeError(value);
- } else {
- completer.complete(value);
- }
- });
- return completer.future;
- }
-
bool operator==(var other) {
return (other is _SendPortImpl) && _id == other._id;
}
int get hashCode {
- return _id;
+ const int MASK = 0x3FFFFFFF;
+ int hash = _id;
+ hash = (hash + ((hash & (MASK >> 10)) << 10)) & MASK;
+ hash ^= (hash >> 6);
+ hash = (hash + ((hash & (MASK >> 3)) << 3)) & MASK;
+ hash ^= (hash >> 11);
+ hash = (hash + ((hash & (MASK >> 15)) << 15)) & MASK;
+ return hash;
}
/*--- private implementation ---*/
@@ -177,19 +153,86 @@ class _SendPortImpl implements SendPort {
_getPortInternal() native "isolate_getPortInternal";
-ReceivePort _portInternal;
-
-patch class _Isolate {
- /* patch */ static ReceivePort get port {
- if (_portInternal == null) {
- _portInternal = _getPortInternal();
+typedef _MainFunction();
+typedef _MainFunctionArgs(args);
+typedef _MainFunctionArgsMessage(args, message);
+
+/**
+ * Takes the real entry point as argument and invokes it with the initial
+ * message.
+ *
+ * The initial message is (currently) received through the global port variable.
+ */
+void _startIsolate(Function entryPoint, bool isSpawnUri) {
+ Isolate._port.first.then((message) {
+ SendPort replyTo = message[0];
+ // TODO(floitsch): don't send ok-message if we can't find the entry point.
+ replyTo.send("started");
+ if (isSpawnUri) {
+ assert(message.length == 3);
+ List<String> args = message[1];
+ var isolateMessage = message[2];
+ if (entryPoint is _MainFunctionArgsMessage) {
+ entryPoint(args, isolateMessage);
+ } else if (entryPoint is _MainFunctionArgs) {
+ entryPoint(args);
+ } else {
+ entryPoint();
+ }
+ } else {
+ assert(message.length == 2);
+ var entryMessage = message[1];
+ entryPoint(entryMessage);
}
- return _portInternal;
+ });
+}
+
+patch class Isolate {
+ /* patch */ static Future<Isolate> spawn(
+ void entryPoint(message), var message) {
+ Completer completer = new Completer<Isolate>.sync();
+ try {
+ // The VM will invoke [_startIsolate] with entryPoint as argument.
+ SendPort controlPort = _spawnFunction(entryPoint);
+ RawReceivePort readyPort = new RawReceivePort();
+ controlPort.send([readyPort.sendPort, message]);
+ readyPort.handler = (readyMessage) {
+ assert(readyMessage == 'started');
+ readyPort.close();
+ completer.complete(new Isolate._fromControlPort(controlPort));
+ };
+ } catch(e, st) {
+ // TODO(floitsch): we want errors to go into the returned future.
+ rethrow;
+ };
+ return completer.future;
}
- /* patch */ static SendPort spawnFunction(void topLevelFunction(),
- [bool unhandledExceptionCallback(IsolateUnhandledException e)])
+ /* patch */ static Future<Isolate> spawnUri(
+ Uri uri, List<String> args, var message) {
+ Completer completer = new Completer<Isolate>.sync();
+ try {
+ // The VM will invoke [_startIsolate] and not `main`.
+ SendPort controlPort = _spawnUri(uri.path);
+ RawReceivePort readyPort = new RawReceivePort();
+ controlPort.send([readyPort.sendPort, args, message]);
+ readyPort.handler = (readyMessage) {
+ assert(readyMessage == 'started');
+ readyPort.close();
+ completer.complete(new Isolate._fromControlPort(controlPort));
+ };
+ } catch(e, st) {
+ // TODO(floitsch): we want errors to go into the returned future.
+ rethrow;
+ };
+ return completer.future;
+ }
+
+ static final ReceivePort _port =
+ new ReceivePort.fromRawReceivePort(_getPortInternal());
+
+ static SendPort _spawnFunction(Function topLevelFunction)
native "isolate_spawnFunction";
- /* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri";
+ static SendPort _spawnUri(String uri) native "isolate_spawnUri";
}
« no previous file with comments | « runtime/lib/isolate.cc ('k') | runtime/tests/vm/dart/isolate_mirror_local_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698