Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(225)

Unified Diff: sdk/lib/_internal/lib/isolate_helper.dart

Issue 70263002: Add RawReceivePort to dart2js. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | tests/isolate/raw_port_test.dart » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/_internal/lib/isolate_helper.dart
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
index d13871bc6eba2ea3b3cd4160368d5b9b482b53e0..d72ae6173d32426f4b434c8f5c9bee4c4dffdc00 100644
--- a/sdk/lib/_internal/lib/isolate_helper.dart
+++ b/sdk/lib/_internal/lib/isolate_helper.dart
@@ -225,14 +225,14 @@ class _IsolateContext implements IsolateContext {
int id;
/** Registry of receive ports currently active on this isolate. */
- Map<int, ReceivePort> ports;
+ Map<int, ReceivePortBase> ports;
/** Holds isolate globals (statics and top-level properties). */
var isolateStatics; // native object containing all globals of an isolate.
_IsolateContext() {
id = _globalState.nextIsolateId++;
- ports = new Map<int, ReceivePort>();
+ ports = new Map<int, ReceivePortBase>();
isolateStatics = JS_CREATE_ISOLATE();
}
@@ -258,10 +258,10 @@ class _IsolateContext implements IsolateContext {
}
/** Lookup a port registered for this isolate. */
- ReceivePort lookup(int portId) => ports[portId];
+ ReceivePortBase lookup(int portId) => ports[portId];
/** Register a port on this isolate. */
- void register(int portId, ReceivePort port) {
+ void register(int portId, ReceivePortBase port) {
if (ports.containsKey(portId)) {
throw new Exception("Registry: ports must be registered only once.");
}
@@ -725,7 +725,7 @@ class _BaseSendPort implements SendPort {
/** A send port that delivers messages in-memory via native JavaScript calls. */
class _NativeJsSendPort extends _BaseSendPort implements SendPort {
- final ReceivePortImpl _receivePort;
+ final ReceivePortBase _receivePort;
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
@@ -734,7 +734,7 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
// Check that the isolate still runs and the port is still open
final isolate = _globalState.isolates[_isolateId];
if (isolate == null) return;
- if (_receivePort._controller.isClosed) return;
+ if (_receivePort._isClosed) return;
// We force serialization/deserialization as a simple way to ensure
// isolate communication restrictions are respected between isolates that
@@ -750,11 +750,11 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
msg = _serializeMessage(msg);
}
_globalState.topEventLoop.enqueue(isolate, () {
- if (!_receivePort._controller.isClosed) {
+ if (!_receivePort._isClosed) {
if (shouldSerialize) {
msg = _deserializeMessage(msg);
}
- _receivePort._controller.add(msg);
+ _receivePort._add(msg);
}
}, 'receive $message');
});
@@ -859,14 +859,27 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
int get hashCode => _id;
}
-/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
-class ReceivePortImpl extends Stream implements ReceivePort {
+class ReceivePortBase {
static int _nextFreeId = 1;
final int _id;
+
+ ReceivePortBase() : _id = _nextFreeId++ {
+ _globalState.currentContext.register(_id, this);
+ }
+
+ void _add(dataEvent);
+
+ bool get _isClosed;
+}
+
+/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
+class ReceivePortImpl extends Stream
+ implements ReceivePort, ReceivePortBase {
+ final int _id;
StreamController _controller;
ReceivePortImpl()
- : _id = _nextFreeId++ {
+ : _id = ReceivePortBase._nextFreeId++ {
_controller = new StreamController(onCancel: close, sync: true);
_globalState.currentContext.register(_id, this);
}
@@ -879,6 +892,13 @@ class ReceivePortImpl extends Stream implements ReceivePort {
cancelOnError: cancelOnError);
}
+ void _add(dataEvent) {
+ if (_controller.isClosed) return;
+ _controller.add(dataEvent);
+ }
+
+ bool get _isClosed => _controller.isClosed;
+
void close() {
if (_controller.isClosed) return;
_controller.close();
@@ -890,6 +910,57 @@ class ReceivePortImpl extends Stream implements ReceivePort {
}
}
+class ReceivePortWrapper extends Stream implements ReceivePort {
+ final RawReceivePort _rawPort;
+ StreamController _controller;
+
+ ReceivePortWrapper(this._rawPort) {
+ _controller = new StreamController(onCancel: close, sync: true);
+ _rawPort.handler = _controller.add;
+ }
+
+ StreamSubscription listen(void onData(var event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
+ }
+
+ void close() {
+ _rawPort.close();
+ _controller.close();
+ }
+
+ SendPort get sendPort => _rawPort.sendPort;
+}
+
+class RawReceivePortImpl extends ReceivePortBase implements RawReceivePort {
+ Function _handler;
+ bool _isClosed = false;
+
+ RawReceivePortImpl(this._handler);
+
+ void set handler(Function newHandler) {
+ _handler = newHandler;
+ }
+
+ void close() {
+ if (_isClosed) return;
+ _isClosed = true;
+ _globalState.currentContext.unregister(_id);
floitsch 2013/11/18 10:48:31 Since the ReceivePortBase registers, it should als
Lasse Reichstein Nielsen 2013/11/18 15:45:36 Done.
+ }
+
+ void _add(dataEvent) {
+ if (_isClosed) return;
+ _handler(dataEvent);
+ }
+
+ SendPort get sendPort {
floitsch 2013/11/18 10:48:31 Should be in base clase.
+ return new _NativeJsSendPort(this, _globalState.currentContext.id);
+ }
+}
+
/** Wait until all ports in a message are resolved. */
_waitForPendingPorts(var message, void callback()) {
final finder = new _PendingSendPortFinder();
« no previous file with comments | « no previous file | sdk/lib/_internal/lib/isolate_patch.dart » ('j') | tests/isolate/raw_port_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698