Chromium Code Reviews| Index: sdk/lib/_internal/lib/isolate_helper.dart |
| diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart |
| index d13871bc6eba2ea3b3cd4160368d5b9b482b53e0..d72ae6173d32426f4b434c8f5c9bee4c4dffdc00 100644 |
| --- a/sdk/lib/_internal/lib/isolate_helper.dart |
| +++ b/sdk/lib/_internal/lib/isolate_helper.dart |
| @@ -225,14 +225,14 @@ class _IsolateContext implements IsolateContext { |
| int id; |
| /** Registry of receive ports currently active on this isolate. */ |
| - Map<int, ReceivePort> ports; |
| + Map<int, ReceivePortBase> ports; |
| /** Holds isolate globals (statics and top-level properties). */ |
| var isolateStatics; // native object containing all globals of an isolate. |
| _IsolateContext() { |
| id = _globalState.nextIsolateId++; |
| - ports = new Map<int, ReceivePort>(); |
| + ports = new Map<int, ReceivePortBase>(); |
| isolateStatics = JS_CREATE_ISOLATE(); |
| } |
| @@ -258,10 +258,10 @@ class _IsolateContext implements IsolateContext { |
| } |
| /** Lookup a port registered for this isolate. */ |
| - ReceivePort lookup(int portId) => ports[portId]; |
| + ReceivePortBase lookup(int portId) => ports[portId]; |
| /** Register a port on this isolate. */ |
| - void register(int portId, ReceivePort port) { |
| + void register(int portId, ReceivePortBase port) { |
| if (ports.containsKey(portId)) { |
| throw new Exception("Registry: ports must be registered only once."); |
| } |
| @@ -725,7 +725,7 @@ class _BaseSendPort implements SendPort { |
| /** A send port that delivers messages in-memory via native JavaScript calls. */ |
| class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
| - final ReceivePortImpl _receivePort; |
| + final ReceivePortBase _receivePort; |
| const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
| @@ -734,7 +734,7 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
| // Check that the isolate still runs and the port is still open |
| final isolate = _globalState.isolates[_isolateId]; |
| if (isolate == null) return; |
| - if (_receivePort._controller.isClosed) return; |
| + if (_receivePort._isClosed) return; |
| // We force serialization/deserialization as a simple way to ensure |
| // isolate communication restrictions are respected between isolates that |
| @@ -750,11 +750,11 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
| msg = _serializeMessage(msg); |
| } |
| _globalState.topEventLoop.enqueue(isolate, () { |
| - if (!_receivePort._controller.isClosed) { |
| + if (!_receivePort._isClosed) { |
| if (shouldSerialize) { |
| msg = _deserializeMessage(msg); |
| } |
| - _receivePort._controller.add(msg); |
| + _receivePort._add(msg); |
| } |
| }, 'receive $message'); |
| }); |
| @@ -859,14 +859,27 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
| int get hashCode => _id; |
| } |
| -/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
| -class ReceivePortImpl extends Stream implements ReceivePort { |
| +class ReceivePortBase { |
| static int _nextFreeId = 1; |
| final int _id; |
| + |
| + ReceivePortBase() : _id = _nextFreeId++ { |
| + _globalState.currentContext.register(_id, this); |
| + } |
| + |
| + void _add(dataEvent); |
| + |
| + bool get _isClosed; |
| +} |
| + |
| +/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
| +class ReceivePortImpl extends Stream |
| + implements ReceivePort, ReceivePortBase { |
| + final int _id; |
| StreamController _controller; |
| ReceivePortImpl() |
| - : _id = _nextFreeId++ { |
| + : _id = ReceivePortBase._nextFreeId++ { |
| _controller = new StreamController(onCancel: close, sync: true); |
| _globalState.currentContext.register(_id, this); |
| } |
| @@ -879,6 +892,13 @@ class ReceivePortImpl extends Stream implements ReceivePort { |
| cancelOnError: cancelOnError); |
| } |
| + void _add(dataEvent) { |
| + if (_controller.isClosed) return; |
| + _controller.add(dataEvent); |
| + } |
| + |
| + bool get _isClosed => _controller.isClosed; |
| + |
| void close() { |
| if (_controller.isClosed) return; |
| _controller.close(); |
| @@ -890,6 +910,57 @@ class ReceivePortImpl extends Stream implements ReceivePort { |
| } |
| } |
| +class ReceivePortWrapper extends Stream implements ReceivePort { |
| + final RawReceivePort _rawPort; |
| + StreamController _controller; |
| + |
| + ReceivePortWrapper(this._rawPort) { |
| + _controller = new StreamController(onCancel: close, sync: true); |
| + _rawPort.handler = _controller.add; |
| + } |
| + |
| + StreamSubscription listen(void onData(var event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
| + cancelOnError: cancelOnError); |
| + } |
| + |
| + void close() { |
| + _rawPort.close(); |
| + _controller.close(); |
| + } |
| + |
| + SendPort get sendPort => _rawPort.sendPort; |
| +} |
| + |
| +class RawReceivePortImpl extends ReceivePortBase implements RawReceivePort { |
| + Function _handler; |
| + bool _isClosed = false; |
| + |
| + RawReceivePortImpl(this._handler); |
| + |
| + void set handler(Function newHandler) { |
| + _handler = newHandler; |
| + } |
| + |
| + void close() { |
| + if (_isClosed) return; |
| + _isClosed = true; |
| + _globalState.currentContext.unregister(_id); |
|
floitsch
2013/11/18 10:48:31
Since the ReceivePortBase registers, it should als
Lasse Reichstein Nielsen
2013/11/18 15:45:36
Done.
|
| + } |
| + |
| + void _add(dataEvent) { |
| + if (_isClosed) return; |
| + _handler(dataEvent); |
| + } |
| + |
| + SendPort get sendPort { |
|
floitsch
2013/11/18 10:48:31
Should be in base clase.
|
| + return new _NativeJsSendPort(this, _globalState.currentContext.id); |
| + } |
| +} |
| + |
| /** Wait until all ports in a message are resolved. */ |
| _waitForPendingPorts(var message, void callback()) { |
| final finder = new _PendingSendPortFinder(); |