Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(105)

Unified Diff: runtime/lib/isolate_patch.dart

Issue 27215002: Very simple version of Isolates. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: More comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: runtime/lib/isolate_patch.dart
diff --git a/runtime/lib/isolate_patch.dart b/runtime/lib/isolate_patch.dart
index a2713d10c604716cf53d1aebbe1a7f90248a4519..e9fd3bba5b862f15560a6b8ac4aa1f282b71aa8c 100644
--- a/runtime/lib/isolate_patch.dart
+++ b/runtime/lib/isolate_patch.dart
@@ -2,85 +2,44 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-class _CloseToken {
- /// This token is sent from [IsolateSink]s to [IsolateStream]s to ask them to
- /// close themselves.
- const _CloseToken();
-}
-
-patch bool _isCloseToken(var object) {
- // TODO(floitsch): can we compare against const _CloseToken()?
- return object is _CloseToken;
-}
-
-patch class MessageBox {
- /* patch */ MessageBox.oneShot() : this._oneShot(new ReceivePort());
- MessageBox._oneShot(ReceivePort receivePort)
- : stream = new IsolateStream._fromOriginalReceivePortOneShot(receivePort),
- sink = new _IsolateSink._fromPort(receivePort.toSendPort());
-
- /* patch */ MessageBox() : this._(new ReceivePort());
- MessageBox._(ReceivePort receivePort)
- : stream = new IsolateStream._fromOriginalReceivePort(receivePort),
- sink = new _IsolateSink._fromPort(receivePort.toSendPort());
-}
-
-class _IsolateSink implements IsolateSink {
- bool _isClosed = false;
- final SendPort _port;
- _IsolateSink._fromPort(this._port);
-
- void add(dynamic message) {
- _port.send(message);
- }
-
- void addError(Object errorEvent) {
- throw new UnimplementedError("addError on isolate streams");
- }
-
- void close() {
- if (_isClosed) return;
- add(const _CloseToken());
- _isClosed = true;
- }
-
- bool operator==(var other) {
- return other is IsolateSink && _port == other._port;
- }
-
- int get hashCode => _port.hashCode + 499;
-}
-
-patch IsolateSink streamSpawnFunction(
- void topLevelFunction(),
- [bool unhandledExceptionCallback(IsolateUnhandledException e)]) {
- SendPort sendPort = spawnFunction(topLevelFunction,
- unhandledExceptionCallback);
- return new _IsolateSink._fromPort(sendPort);
-}
-
patch class ReceivePort {
/* patch */ factory ReceivePort() {
- return new _ReceivePortImpl();
+ _ReceivePortImpl result = new _ReceivePortImpl();
+ // TODO(floitsch): remove the hack to close receive-ports on cancel.
+ result._controller = new StreamController(onCancel: result._close);
+ return result;
}
}
-class _ReceivePortImpl implements ReceivePort {
+class _ReceivePortImpl extends Stream implements ReceivePort {
factory _ReceivePortImpl() native "ReceivePortImpl_factory";
+ // Deprecated.
receive(void onMessage(var message, SendPort replyTo)) {
_onMessage = onMessage;
}
+ // Deprecated.
close() {
+ _close();
+ }
+
+ _close() {
_portMap.remove(_id);
_closeInternal(_id);
}
- SendPort toSendPort() {
+ SendPort get sendPort {
return new _SendPortImpl(_id);
}
+ StreamSubscription listen(void onData(var message),
+ { Function onError,
+ void onDone(),
+ bool cancelOnError }) {
+ return _controller.stream.listen(onData);
+ }
+
/**** Internal implementation details ****/
// Called from the VM to create a new ReceivePort instance.
static _ReceivePortImpl _get_or_create(int id) {
@@ -110,13 +69,18 @@ class _ReceivePortImpl implements ReceivePort {
static void _handleMessage(_ReceivePortImpl port, int replyId, var message) {
assert(port != null);
SendPort replyTo = (replyId == 0) ? null : new _SendPortImpl(replyId);
- (port._onMessage)(message, replyTo);
+ if (port._onMessage != null) {
+ (port._onMessage)(message, replyTo);
+ } else {
+ port._controller.add(message);
+ }
}
// Call into the VM to close the VM maintained mappings.
static _closeInternal(int id) native "ReceivePortImpl_closeInternal";
final int _id;
+ StreamController _controller;
var _onMessage;
// id to ReceivePort mapping.
@@ -135,6 +99,7 @@ class _SendPortImpl implements SendPort {
_sendInternal(_id, replyId, message);
}
+ /// Deprecated.
Future call(var message) {
final completer = new Completer.sync();
final port = new _ReceivePortImpl();
@@ -179,6 +144,53 @@ _getPortInternal() native "isolate_getPortInternal";
ReceivePort _portInternal;
+typedef _ZeroArgFunction();
+
+/**
+ * Takes the real entry point as argument and invokes it with the initial
+ * message.
+ *
+ * The initial message is (currently) received through the global port variable.
+ */
+void _startIsolate(Function entryPoint) {
+ bool first = true;
+ _Isolate.port.receive((message, replyTo) {
+ assert(first);
+ first = false;
+ var initialMessage = message[0];
+ var reply = message[1];
+ reply.send("started");
+ if (entryPoint is _ZeroArgFunction) {
+ entryPoint();
+ } else {
+ entryPoint(initialMessage);
+ }
+ });
+}
+
+patch class Isolate {
+ /* patch */ static Future<Isolate> spawn(
+ void entryPoint(message), var message, { bool startPaused: false }) {
+ if (startPaused) throw new UnimplementedError("spawn paused isolate");
+ return new Future<Isolate>.sync(() {
+ // The VM will invoke [_startIsolate] with entryPoint as argument.
+ SendPort controlPort = _Isolate._spawnFunction(entryPoint);
+ ReceivePort readyPort = new ReceivePort();
+ controlPort.send([message, readyPort.sendPort]);
+ Completer completer = new Completer<Isolate>();
+ readyPort.receive((ignored1, ignored2) {
+ readyPort.close();
+ completer.complete(new Isolate._fromControlPort(controlPort));
+ });
+ return completer.future;
+ });
+ }
+
+ /* patch */ static Future<Isolate> spawnUri(Uri uri, var message) {
+ throw new UnimplementedError("Isolate.spawnUri");
+ }
+}
+
patch class _Isolate {
/* patch */ static ReceivePort get port {
if (_portInternal == null) {
@@ -187,8 +199,7 @@ patch class _Isolate {
return _portInternal;
}
- /* patch */ static SendPort spawnFunction(void topLevelFunction(),
- [bool unhandledExceptionCallback(IsolateUnhandledException e)])
+ static SendPort _spawnFunction(Function topLevelFunction)
native "isolate_spawnFunction";
/* patch */ static SendPort spawnUri(String uri) native "isolate_spawnUri";

Powered by Google App Engine
This is Rietveld 408576698