Index: sdk/lib/_internal/lib/isolate_helper.dart |
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart |
index d13871bc6eba2ea3b3cd4160368d5b9b482b53e0..4103a7c983b27d4e635ba78a6743c635e63fdf4d 100644 |
--- a/sdk/lib/_internal/lib/isolate_helper.dart |
+++ b/sdk/lib/_internal/lib/isolate_helper.dart |
@@ -225,14 +225,14 @@ class _IsolateContext implements IsolateContext { |
int id; |
/** Registry of receive ports currently active on this isolate. */ |
- Map<int, ReceivePort> ports; |
+ Map<int, RawReceivePortImpl> ports; |
/** Holds isolate globals (statics and top-level properties). */ |
var isolateStatics; // native object containing all globals of an isolate. |
_IsolateContext() { |
id = _globalState.nextIsolateId++; |
- ports = new Map<int, ReceivePort>(); |
+ ports = new Map<int, RawReceivePortImpl>(); |
isolateStatics = JS_CREATE_ISOLATE(); |
} |
@@ -258,10 +258,10 @@ class _IsolateContext implements IsolateContext { |
} |
/** Lookup a port registered for this isolate. */ |
- ReceivePort lookup(int portId) => ports[portId]; |
+ RawReceivePortImpl lookup(int portId) => ports[portId]; |
/** Register a port on this isolate. */ |
- void register(int portId, ReceivePort port) { |
+ void register(int portId, RawReceivePortImpl port) { |
if (ports.containsKey(portId)) { |
throw new Exception("Registry: ports must be registered only once."); |
} |
@@ -725,7 +725,7 @@ class _BaseSendPort implements SendPort { |
/** A send port that delivers messages in-memory via native JavaScript calls. */ |
class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
- final ReceivePortImpl _receivePort; |
+ final RawReceivePortImpl _receivePort; |
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
@@ -734,7 +734,7 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
// Check that the isolate still runs and the port is still open |
final isolate = _globalState.isolates[_isolateId]; |
if (isolate == null) return; |
- if (_receivePort._controller.isClosed) return; |
+ if (_receivePort._isClosed) return; |
// We force serialization/deserialization as a simple way to ensure |
// isolate communication restrictions are respected between isolates that |
@@ -750,11 +750,11 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
msg = _serializeMessage(msg); |
} |
_globalState.topEventLoop.enqueue(isolate, () { |
- if (!_receivePort._controller.isClosed) { |
+ if (!_receivePort._isClosed) { |
if (shouldSerialize) { |
msg = _deserializeMessage(msg); |
} |
- _receivePort._controller.add(msg); |
+ _receivePort._add(msg); |
} |
}, 'receive $message'); |
}); |
@@ -859,16 +859,47 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
int get hashCode => _id; |
} |
-/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
-class ReceivePortImpl extends Stream implements ReceivePort { |
+class RawReceivePortImpl implements RawReceivePort { |
static int _nextFreeId = 1; |
- final int _id; |
+ |
+ final int _id = _nextFreeId++; |
+ Function _handler; |
+ bool _isClosed = false; |
+ |
+ RawReceivePortImpl(this._handler) { |
+ _globalState.currentContext.register(_id, this); |
+ } |
+ |
+ void set handler(Function newHandler) { |
+ _handler = newHandler; |
+ } |
+ |
+ void close() { |
+ if (_isClosed) return; |
+ _isClosed = true; |
+ _handler = null; |
+ _globalState.currentContext.unregister(_id); |
+ } |
+ |
+ void _add(dataEvent) { |
+ if (_isClosed) return; |
+ _handler(dataEvent); |
+ } |
+ |
+ SendPort get sendPort { |
+ return new _NativeJsSendPort(this, _globalState.currentContext.id); |
+ } |
+} |
+ |
+class ReceivePortImpl extends Stream implements ReceivePort { |
+ final RawReceivePort _rawPort; |
StreamController _controller; |
- ReceivePortImpl() |
- : _id = _nextFreeId++ { |
+ ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePortImpl(null)); |
+ |
+ ReceivePortImpl.fromRawReceivePort(this._rawPort) { |
_controller = new StreamController(onCancel: close, sync: true); |
- _globalState.currentContext.register(_id, this); |
+ _rawPort.handler = _controller.add; |
} |
StreamSubscription listen(void onData(var event), |
@@ -880,16 +911,14 @@ class ReceivePortImpl extends Stream implements ReceivePort { |
} |
void close() { |
- if (_controller.isClosed) return; |
+ _rawPort.close(); |
_controller.close(); |
- _globalState.currentContext.unregister(_id); |
} |
- SendPort get sendPort { |
- return new _NativeJsSendPort(this, _globalState.currentContext.id); |
- } |
+ SendPort get sendPort => _rawPort.sendPort; |
} |
+ |
/** Wait until all ports in a message are resolved. */ |
_waitForPendingPorts(var message, void callback()) { |
final finder = new _PendingSendPortFinder(); |