Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1393)

Unified Diff: runtime/lib/isolate_patch.dart

Issue 27215002: Very simple version of Isolates. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebase Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: runtime/lib/isolate_patch.dart
diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart
index a2713d10c604716cf53d1aebbe1a7f90248a4519..a5cb017a7b3d4d9b8b249ae926a387776aa68cc9 100644
--- a/runtime/lib/isolate_patch.dart
+++ b/runtime/lib/isolate_patch.dart
@@ -2,85 +2,32 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-class _CloseToken {
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
- /// close themselves.
- const _CloseToken();
-}
-
-patch bool _isCloseToken(var object) {
- // TODO(floitsch): can we compare against const _CloseToken()?
- return object is _CloseToken;
-}
-
-patch class MessageBox {
- /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort());
- MessageBox._oneShot(ReceivePort receivePort)
- : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
- sink = new _IsolateSink._fromPort(receivePort.toSendPort());
-
- /* patch */ MessageBox() : this._(new ReceivePort());
- MessageBox._(ReceivePort receivePort)
- : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
- sink = new _IsolateSink._fromPort(receivePort.toSendPort());
-}
-
-class _IsolateSink implements IsolateSink {
- bool _isClosed = false;
- final SendPort _port;
- _IsolateSink._fromPort(this._port);
-
- void add(dynamic message) {
- _port.send(message);
- }
-
- void addError(Object errorEvent) {
- throw new UnimplementedError("addError on isolate streams");
- }
-
- void close() {
- if (_isClosed) return;
- add(const _CloseToken());
- _isClosed = true;
- }
-
- bool operator==(var other) {
- return other is IsolateSink && _port == other._port;
- }
-
- int get hashCode => _port.hashCode + 499;
-}
-
-patch IsolateSink streamSpawnFunction(
- void topLevelFunction(),
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) {
- SendPort sendPort = spawnFunction(topLevelFunction,
- unhandledExceptionCallback);
- return new _IsolateSink._fromPort(sendPort);
-}
-
patch class ReceivePort {
/* patch */ factory ReceivePort() {
return new _ReceivePortImpl();
}
}
-class _ReceivePortImpl implements ReceivePort {
+class _ReceivePortImpl extends Stream implements ReceivePort {
factory _ReceivePortImpl() native "ReceivePortImpl_factory";
- receive(void onMessage(var message, SendPort replyTo)) {
- _onMessage = onMessage;
- }
-
close() {
_portMap.remove(_id);
_closeInternal(_id);
+ _controller.close();
}
- SendPort toSendPort() {
+ SendPort get sendPort {
return new _SendPortImpl(_id);
}
+ StreamSubscription listen(void onData(var message),
+ { Function onError,
+ void onDone(),
+ bool cancelOnError }) {
+ return _controller.stream.listen(onData);
+ }
+
/**** Internal implementation details ****/
// Called from the VM to create a new ReceivePort instance.
static _ReceivePortImpl _get_or_create(int id) {
@@ -98,6 +45,9 @@ class _ReceivePortImpl implements ReceivePort {
_portMap = new Map();
}
_portMap[id] = this;
+
+ // TODO(floitsch): remove the hack to close receive-ports on cancel.
+ _controller = new StreamController(onCancel: close, sync: true);
}
// Called from the VM to retrieve the ReceivePort for a message.
@@ -109,15 +59,14 @@ class _ReceivePortImpl implements ReceivePort {
// Called from the VM to dispatch to the handler.
static void _handleMessage(_ReceivePortImpl port, int replyId, var message) {
assert(port != null);
- SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId);
- (port._onMessage)(message, replyTo);
+ port._controller.add(message);
}
// Call into the VM to close the VM maintained mappings.
static _closeInternal(int id) native "ReceivePortImpl_closeInternal";
final int _id;
- var _onMessage;
+ StreamController _controller;
// id to ReceivePort mapping.
static Map _portMap;
@@ -135,21 +84,6 @@ class _SendPortImpl implements SendPort {
_sendInternal(_id, replyId, message);
}
- Future call(var message) {
- final completer = new Completer.sync();
- final port = new _ReceivePortImpl();
- send(message, port.toSendPort());
- port.receive((value, ignoreReplyTo) {
- port.close();
- if (value is Exception) {
- completer.completeError(value);
- } else {
- completer.complete(value);
- }
- });
- return completer.future;
- }
-
bool operator==(var other) {
return (other is _SendPortImpl) && _id == other._id;
}
@@ -179,6 +113,42 @@ _getPortInternal() native "isolate_getPortInternal";
ReceivePort _portInternal;
+/**
+ * Takes the real entry point as argument and invokes it with the initial
+ * message.
+ *
+ * The initial message is (currently) received through the global port variable.
+ */
+void _startIsolate(void entryPoint(message)) {
+ _Isolate.port.first.then((message) {
+ var initialMessage = message[0];
+ var reply = message[1];
+ reply.send("started");
+ entryPoint(initialMessage);
+ });
+}
+
+patch class Isolate {
+ /* patch */ static Future<Isolate> spawn(
+ void entryPoint(message), var message) {
+ return new Future<Isolate>.sync(() {
+ // The VM will invoke [_startIsolate] with entryPoint as argument.
+ SendPort controlPort = _Isolate._spawnFunction(entryPoint);
+ ReceivePort readyPort = new ReceivePort();
+ controlPort.send([message, readyPort.sendPort]);
+ Completer completer = new Completer<Isolate>();
+ readyPort.first.then((_) {
+ completer.complete(new Isolate._fromControlPort(controlPort));
+ });
+ return completer.future;
+ });
+ }
+
+ /* patch */ static Future<Isolate> spawnUri(Uri uri, var message) {
+ throw new UnimplementedError("Isolate.spawnUri");
+ }
+}
+
patch class _Isolate {
/* patch */ static ReceivePort get port {
if (_portInternal == null) {
@@ -187,8 +157,7 @@ patch class _Isolate {
return _portInternal;
}
- /* patch */ static SendPort spawnFunction(void topLevelFunction(),
- [bool unhandledExceptionCallback(IsolateUnhandledException e)])
+ static SendPort _spawnFunction(Function topLevelFunction)
native "isolate_spawnFunction";
/* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri";

Powered by Google App Engine
This is Rietveld 408576698