Index: sdk/lib/_internal/lib/isolate_helper.dart |
diff --git a/sdk/lib/_internal/lib/isolate_helper.dart b/sdk/lib/_internal/lib/isolate_helper.dart |
index d13871bc6eba2ea3b3cd4160368d5b9b482b53e0..d72ae6173d32426f4b434c8f5c9bee4c4dffdc00 100644 |
--- a/sdk/lib/_internal/lib/isolate_helper.dart |
+++ b/sdk/lib/_internal/lib/isolate_helper.dart |
@@ -225,14 +225,14 @@ class _IsolateContext implements IsolateContext { |
int id; |
/** Registry of receive ports currently active on this isolate. */ |
- Map<int, ReceivePort> ports; |
+ Map<int, ReceivePortBase> ports; |
/** Holds isolate globals (statics and top-level properties). */ |
var isolateStatics; // native object containing all globals of an isolate. |
_IsolateContext() { |
id = _globalState.nextIsolateId++; |
- ports = new Map<int, ReceivePort>(); |
+ ports = new Map<int, ReceivePortBase>(); |
isolateStatics = JS_CREATE_ISOLATE(); |
} |
@@ -258,10 +258,10 @@ class _IsolateContext implements IsolateContext { |
} |
/** Lookup a port registered for this isolate. */ |
- ReceivePort lookup(int portId) => ports[portId]; |
+ ReceivePortBase lookup(int portId) => ports[portId]; |
/** Register a port on this isolate. */ |
- void register(int portId, ReceivePort port) { |
+ void register(int portId, ReceivePortBase port) { |
if (ports.containsKey(portId)) { |
throw new Exception("Registry: ports must be registered only once."); |
} |
@@ -725,7 +725,7 @@ class _BaseSendPort implements SendPort { |
/** A send port that delivers messages in-memory via native JavaScript calls. */ |
class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
- final ReceivePortImpl _receivePort; |
+ final ReceivePortBase _receivePort; |
const _NativeJsSendPort(this._receivePort, int isolateId) : super(isolateId); |
@@ -734,7 +734,7 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
// Check that the isolate still runs and the port is still open |
final isolate = _globalState.isolates[_isolateId]; |
if (isolate == null) return; |
- if (_receivePort._controller.isClosed) return; |
+ if (_receivePort._isClosed) return; |
// We force serialization/deserialization as a simple way to ensure |
// isolate communication restrictions are respected between isolates that |
@@ -750,11 +750,11 @@ class _NativeJsSendPort extends _BaseSendPort implements SendPort { |
msg = _serializeMessage(msg); |
} |
_globalState.topEventLoop.enqueue(isolate, () { |
- if (!_receivePort._controller.isClosed) { |
+ if (!_receivePort._isClosed) { |
if (shouldSerialize) { |
msg = _deserializeMessage(msg); |
} |
- _receivePort._controller.add(msg); |
+ _receivePort._add(msg); |
} |
}, 'receive $message'); |
}); |
@@ -859,14 +859,27 @@ class _BufferingSendPort extends _BaseSendPort implements SendPort { |
int get hashCode => _id; |
} |
-/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
-class ReceivePortImpl extends Stream implements ReceivePort { |
+class ReceivePortBase { |
static int _nextFreeId = 1; |
final int _id; |
+ |
+ ReceivePortBase() : _id = _nextFreeId++ { |
+ _globalState.currentContext.register(_id, this); |
+ } |
+ |
+ void _add(dataEvent); |
+ |
+ bool get _isClosed; |
+} |
+ |
+/** Implementation of a multi-use [ReceivePort] on top of JavaScript. */ |
+class ReceivePortImpl extends Stream |
+ implements ReceivePort, ReceivePortBase { |
+ final int _id; |
StreamController _controller; |
ReceivePortImpl() |
- : _id = _nextFreeId++ { |
+ : _id = ReceivePortBase._nextFreeId++ { |
_controller = new StreamController(onCancel: close, sync: true); |
_globalState.currentContext.register(_id, this); |
} |
@@ -879,6 +892,13 @@ class ReceivePortImpl extends Stream implements ReceivePort { |
cancelOnError: cancelOnError); |
} |
+ void _add(dataEvent) { |
+ if (_controller.isClosed) return; |
+ _controller.add(dataEvent); |
+ } |
+ |
+ bool get _isClosed => _controller.isClosed; |
+ |
void close() { |
if (_controller.isClosed) return; |
_controller.close(); |
@@ -890,6 +910,57 @@ class ReceivePortImpl extends Stream implements ReceivePort { |
} |
} |
+class ReceivePortWrapper extends Stream implements ReceivePort { |
+ final RawReceivePort _rawPort; |
+ StreamController _controller; |
+ |
+ ReceivePortWrapper(this._rawPort) { |
+ _controller = new StreamController(onCancel: close, sync: true); |
+ _rawPort.handler = _controller.add; |
+ } |
+ |
+ StreamSubscription listen(void onData(var event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
+ cancelOnError: cancelOnError); |
+ } |
+ |
+ void close() { |
+ _rawPort.close(); |
+ _controller.close(); |
+ } |
+ |
+ SendPort get sendPort => _rawPort.sendPort; |
+} |
+ |
+class RawReceivePortImpl extends ReceivePortBase implements RawReceivePort { |
+ Function _handler; |
+ bool _isClosed = false; |
+ |
+ RawReceivePortImpl(this._handler); |
+ |
+ void set handler(Function newHandler) { |
+ _handler = newHandler; |
+ } |
+ |
+ void close() { |
+ if (_isClosed) return; |
+ _isClosed = true; |
+ _globalState.currentContext.unregister(_id); |
floitsch
2013/11/18 10:48:31
Since the ReceivePortBase registers, it should als
Lasse Reichstein Nielsen
2013/11/18 15:45:36
Done.
|
+ } |
+ |
+ void _add(dataEvent) { |
+ if (_isClosed) return; |
+ _handler(dataEvent); |
+ } |
+ |
+ SendPort get sendPort { |
floitsch
2013/11/18 10:48:31
Should be in base clase.
|
+ return new _NativeJsSendPort(this, _globalState.currentContext.id); |
+ } |
+} |
+ |
/** Wait until all ports in a message are resolved. */ |
_waitForPendingPorts(var message, void callback()) { |
final finder = new _PendingSendPortFinder(); |