Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(394)

Unified Diff: sdk/lib/_internal/lib/isolate_helper.dart

Issue 70263002: Add RawReceivePort to dart2js. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/_internal/lib/isolate_helper.dart
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
index d13871bc6eba2ea3b3cd4160368d5b9b482b53e0..4103a7c983b27d4e635ba78a6743c635e63fdf4d 100644
--- a/sdk/lib/_internal/lib/isolate_helper.dart
+++ b/sdk/lib/_internal/lib/isolate_helper.dart
@@ -225,14 +225,14 @@ class _IsolateContext implements IsolateContext {
int id;
/** Registry of receive ports currently active on this isolate. */
- Map<int, ReceivePort> ports;
+ Map<int, RawReceivePortImpl> ports;
/** Holds isolate globals (statics and top-level properties). */
var isolateStatics; // native object containing all globals of an isolate.
_IsolateContext() {
id = _globalState.nextIsolateId++;
- ports = new Map<int, ReceivePort>();
+ ports = new Map<int, RawReceivePortImpl>();
isolateStatics = JS_CREATE_ISOLATE();
}
@@ -258,10 +258,10 @@ class _IsolateContext implements IsolateContext {
}
/** Lookup a port registered for this isolate. */
- ReceivePort lookup(int portId) => ports[portId];
+ RawReceivePortImpl lookup(int portId) => ports[portId];
/** Register a port on this isolate. */
- void register(int portId, ReceivePort port) {
+ void register(int portId, RawReceivePortImpl port) {
if (ports.containsKey(portId)) {
throw new Exception("Registry: ports must be registered only once.");
}
@@ -725,7 +725,7 @@ class _BaseSendPort implements SendPort {
/** A send port that delivers messages in-memory via native JavaScript calls. */
class _NativeJsSendPort extends _BaseSendPort implements SendPort {
- final ReceivePortImpl _receivePort;
+ final RawReceivePortImpl _receivePort;
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
@@ -734,7 +734,7 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
// Check that the isolate still runs and the port is still open
final isolate = _globalState.isolates[_isolateId];
if (isolate == null) return;
- if (_receivePort._controller.isClosed) return;
+ if (_receivePort._isClosed) return;
// We force serialization/deserialization as a simple way to ensure
// isolate communication restrictions are respected between isolates that
@@ -750,11 +750,11 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
msg = _serializeMessage(msg);
}
_globalState.topEventLoop.enqueue(isolate, () {
- if (!_receivePort._controller.isClosed) {
+ if (!_receivePort._isClosed) {
if (shouldSerialize) {
msg = _deserializeMessage(msg);
}
- _receivePort._controller.add(msg);
+ _receivePort._add(msg);
}
}, 'receive $message');
});
@@ -859,16 +859,47 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
int get hashCode => _id;
}
-/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
-class ReceivePortImpl extends Stream implements ReceivePort {
+class RawReceivePortImpl implements RawReceivePort {
static int _nextFreeId = 1;
- final int _id;
+
+ final int _id = _nextFreeId++;
+ Function _handler;
+ bool _isClosed = false;
+
+ RawReceivePortImpl(this._handler) {
+ _globalState.currentContext.register(_id, this);
+ }
+
+ void set handler(Function newHandler) {
+ _handler = newHandler;
+ }
+
+ void close() {
+ if (_isClosed) return;
+ _isClosed = true;
+ _handler = null;
+ _globalState.currentContext.unregister(_id);
+ }
+
+ void _add(dataEvent) {
+ if (_isClosed) return;
+ _handler(dataEvent);
+ }
+
+ SendPort get sendPort {
+ return new _NativeJsSendPort(this, _globalState.currentContext.id);
+ }
+}
+
+class ReceivePortImpl extends Stream implements ReceivePort {
+ final RawReceivePort _rawPort;
StreamController _controller;
- ReceivePortImpl()
- : _id = _nextFreeId++ {
+ ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePortImpl(null));
+
+ ReceivePortImpl.fromRawReceivePort(this._rawPort) {
_controller = new StreamController(onCancel: close, sync: true);
- _globalState.currentContext.register(_id, this);
+ _rawPort.handler = _controller.add;
}
StreamSubscription listen(void onData(var event),
@@ -880,16 +911,14 @@ class ReceivePortImpl extends Stream implements ReceivePort {
}
void close() {
- if (_controller.isClosed) return;
+ _rawPort.close();
_controller.close();
- _globalState.currentContext.unregister(_id);
}
- SendPort get sendPort {
- return new _NativeJsSendPort(this, _globalState.currentContext.id);
- }
+ SendPort get sendPort => _rawPort.sendPort;
}
+
/** Wait until all ports in a message are resolved. */
_waitForPendingPorts(var message, void callback()) {
final finder = new _PendingSendPortFinder();
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698