| Index: sdk/lib/_internal/lib/isolate_helper.dart
|
| diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart
|
| index d13871bc6eba2ea3b3cd4160368d5b9b482b53e0..4103a7c983b27d4e635ba78a6743c635e63fdf4d 100644
|
| --- a/sdk/lib/_internal/lib/isolate_helper.dart
|
| +++ b/sdk/lib/_internal/lib/isolate_helper.dart
|
| @@ -225,14 +225,14 @@ class _IsolateContext implements IsolateContext {
|
| int id;
|
|
|
| /** Registry of receive ports currently active on this isolate. */
|
| - Map<int, ReceivePort> ports;
|
| + Map<int, RawReceivePortImpl> ports;
|
|
|
| /** Holds isolate globals (statics and top-level properties). */
|
| var isolateStatics; // native object containing all globals of an isolate.
|
|
|
| _IsolateContext() {
|
| id = _globalState.nextIsolateId++;
|
| - ports = new Map<int, ReceivePort>();
|
| + ports = new Map<int, RawReceivePortImpl>();
|
| isolateStatics = JS_CREATE_ISOLATE();
|
| }
|
|
|
| @@ -258,10 +258,10 @@ class _IsolateContext implements IsolateContext {
|
| }
|
|
|
| /** Lookup a port registered for this isolate. */
|
| - ReceivePort lookup(int portId) => ports[portId];
|
| + RawReceivePortImpl lookup(int portId) => ports[portId];
|
|
|
| /** Register a port on this isolate. */
|
| - void register(int portId, ReceivePort port) {
|
| + void register(int portId, RawReceivePortImpl port) {
|
| if (ports.containsKey(portId)) {
|
| throw new Exception("Registry: ports must be registered only once.");
|
| }
|
| @@ -725,7 +725,7 @@ class _BaseSendPort implements SendPort {
|
|
|
| /** A send port that delivers messages in-memory via native JavaScript calls. */
|
| class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
| - final ReceivePortImpl _receivePort;
|
| + final RawReceivePortImpl _receivePort;
|
|
|
| const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId);
|
|
|
| @@ -734,7 +734,7 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
| // Check that the isolate still runs and the port is still open
|
| final isolate = _globalState.isolates[_isolateId];
|
| if (isolate == null) return;
|
| - if (_receivePort._controller.isClosed) return;
|
| + if (_receivePort._isClosed) return;
|
|
|
| // We force serialization/deserialization as a simple way to ensure
|
| // isolate communication restrictions are respected between isolates that
|
| @@ -750,11 +750,11 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort {
|
| msg = _serializeMessage(msg);
|
| }
|
| _globalState.topEventLoop.enqueue(isolate, () {
|
| - if (!_receivePort._controller.isClosed) {
|
| + if (!_receivePort._isClosed) {
|
| if (shouldSerialize) {
|
| msg = _deserializeMessage(msg);
|
| }
|
| - _receivePort._controller.add(msg);
|
| + _receivePort._add(msg);
|
| }
|
| }, 'receive $message');
|
| });
|
| @@ -859,16 +859,47 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort {
|
| int get hashCode => _id;
|
| }
|
|
|
| -/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */
|
| -class ReceivePortImpl extends Stream implements ReceivePort {
|
| +class RawReceivePortImpl implements RawReceivePort {
|
| static int _nextFreeId = 1;
|
| - final int _id;
|
| +
|
| + final int _id = _nextFreeId++;
|
| + Function _handler;
|
| + bool _isClosed = false;
|
| +
|
| + RawReceivePortImpl(this._handler) {
|
| + _globalState.currentContext.register(_id, this);
|
| + }
|
| +
|
| + void set handler(Function newHandler) {
|
| + _handler = newHandler;
|
| + }
|
| +
|
| + void close() {
|
| + if (_isClosed) return;
|
| + _isClosed = true;
|
| + _handler = null;
|
| + _globalState.currentContext.unregister(_id);
|
| + }
|
| +
|
| + void _add(dataEvent) {
|
| + if (_isClosed) return;
|
| + _handler(dataEvent);
|
| + }
|
| +
|
| + SendPort get sendPort {
|
| + return new _NativeJsSendPort(this, _globalState.currentContext.id);
|
| + }
|
| +}
|
| +
|
| +class ReceivePortImpl extends Stream implements ReceivePort {
|
| + final RawReceivePort _rawPort;
|
| StreamController _controller;
|
|
|
| - ReceivePortImpl()
|
| - : _id = _nextFreeId++ {
|
| + ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePortImpl(null));
|
| +
|
| + ReceivePortImpl.fromRawReceivePort(this._rawPort) {
|
| _controller = new StreamController(onCancel: close, sync: true);
|
| - _globalState.currentContext.register(_id, this);
|
| + _rawPort.handler = _controller.add;
|
| }
|
|
|
| StreamSubscription listen(void onData(var event),
|
| @@ -880,16 +911,14 @@ class ReceivePortImpl extends Stream implements ReceivePort {
|
| }
|
|
|
| void close() {
|
| - if (_controller.isClosed) return;
|
| + _rawPort.close();
|
| _controller.close();
|
| - _globalState.currentContext.unregister(_id);
|
| }
|
|
|
| - SendPort get sendPort {
|
| - return new _NativeJsSendPort(this, _globalState.currentContext.id);
|
| - }
|
| + SendPort get sendPort => _rawPort.sendPort;
|
| }
|
|
|
| +
|
| /** Wait until all ports in a message are resolved. */
|
| _waitForPendingPorts(var message, void callback()) {
|
| final finder = new _PendingSendPortFinder();
|
|
|