Index: mojo/public/dart/src/handle.dart |
diff --git a/mojo/public/dart/src/handle.dart b/mojo/public/dart/src/handle.dart |
index 56b673548615fd854e93f6bfa6644e8f00fa0039..f983815504f0c45b5598f198a1a21bf6c2aeb48d 100644 |
--- a/mojo/public/dart/src/handle.dart |
+++ b/mojo/public/dart/src/handle.dart |
@@ -5,7 +5,7 @@ |
part of core; |
class _MojoHandleNatives { |
- static int register(MojoHandle handle) native "MojoHandle_Register"; |
+ static int register(MojoEventStream eventStream) native "MojoHandle_Register"; |
static int close(int handle) native "MojoHandle_Close"; |
static List wait(int handle, int signals, int deadline) |
native "MojoHandle_Wait"; |
@@ -15,13 +15,13 @@ class _MojoHandleNatives { |
} |
-class RawMojoHandle { |
+class MojoHandle { |
static const int INVALID = 0; |
static const int DEADLINE_INDEFINITE = -1; |
int h; |
- RawMojoHandle(this.h); |
+ MojoHandle(this.h); |
MojoResult close() { |
int result = _MojoHandleNatives.close(h); |
@@ -34,9 +34,9 @@ class RawMojoHandle { |
return new MojoWaitResult(new MojoResult(result[0]), result[1]); |
} |
- bool _ready(int signal) { |
- MojoWaitResult res = wait(signal, 0); |
- switch (res.result) { |
+ bool _ready(MojoHandleSignals signal) { |
+ MojoWaitResult mwr = wait(signal.value, 0); |
+ switch (mwr.result) { |
case MojoResult.OK: |
return true; |
case MojoResult.DEADLINE_EXCEEDED: |
@@ -46,40 +46,37 @@ class RawMojoHandle { |
return false; |
default: |
// Should be unreachable. |
- throw new Exception("Unreachable"); |
+ throw "Unexpected result $res for wait on $h"; |
} |
} |
- bool readyRead() => _ready(MojoHandleSignals.READABLE); |
- bool readyWrite() => _ready(MojoHandleSignals.WRITABLE); |
+ bool get readyRead => _ready(MojoHandleSignals.READABLE); |
+ bool get readyWrite => _ready(MojoHandleSignals.WRITABLE); |
- static MojoWaitManyResult waitMany(List<int> handles, |
- List<int> signals, |
- int deadline) { |
- List result = _MojoHandleNatives.waitMany( |
- handles, signals, deadline); |
- |
+ static MojoWaitManyResult waitMany( |
+ List<int> handles, List<int> signals, int deadline) { |
+ List result = _MojoHandleNatives.waitMany(handles, signals, deadline); |
return new MojoWaitManyResult( |
new MojoResult(result[0]), result[1], result[2]); |
} |
- static MojoResult register(MojoHandle handle) { |
- return new MojoResult(_MojoHandleNatives.register(handle)); |
+ static MojoResult register(MojoEventStream eventStream) { |
+ return new MojoResult(_MojoHandleNatives.register(eventStream)); |
} |
bool get isValid => (h != INVALID); |
String toString() => "$h"; |
- bool operator ==(RawMojoHandle other) { |
+ bool operator ==(MojoHandle other) { |
return h == other.h; |
} |
} |
-class MojoHandle extends Stream<int> { |
+class MojoEventStream extends Stream<int> { |
// The underlying Mojo handle. |
- RawMojoHandle _handle; |
+ MojoHandle _handle; |
// Providing our own stream controller allows us to take custom actions when |
// listeners pause/resume/etc. their StreamSubscription. |
@@ -94,59 +91,39 @@ class MojoHandle extends Stream<int> { |
ReceivePort _receivePort; |
// The signals on this handle that we're interested in. |
- int _signals; |
+ MojoHandleSignals _signals; |
- // Whether the handle has been added to the handle watcher. |
- bool _eventHandlerAdded; |
+ // Whether listen has been called. |
+ bool _isListening; |
- MojoHandle(this._handle) : |
- _signals = MojoHandleSignals.READABLE, |
- _eventHandlerAdded = false { |
- MojoResult result = RawMojoHandle.register(this); |
+ MojoEventStream(MojoHandle handle, |
+ [MojoHandleSignals signals = MojoHandleSignals.READABLE]) : |
+ _handle = handle, |
+ _signals = signals, |
+ _isListening = false { |
+ MojoResult result = MojoHandle.register(this); |
if (!result.isOk) { |
- throw new Exception("Failed to register the MojoHandle"); |
+ throw "Failed to register the MojoHandle: $result."; |
} |
} |
void close() { |
- if (_eventHandlerAdded) { |
+ if (_handle != null) { |
MojoHandleWatcher.close(_handle); |
- _eventHandlerAdded = false; |
- } else { |
- // If we're not in the handle watcher, then close the handle manually. |
- _handle.close(); |
+ _handle = null; |
} |
if (_receivePort != null) { |
_receivePort.close(); |
+ _receivePort = null; |
} |
} |
- // We wrap the callback provided by clients in listen() with some code to |
- // handle adding and removing the handle to/from the handle watcher. Because |
- // the handle watcher removes this handle whenever it receives an event, |
- // we have to re-add it when the callback is finished. |
- Function _onDataClosure(origOnData) { |
- return ((int event) { |
- // The handle watcher removes this handle from its set on an event. |
- _eventHandlerAdded = false; |
- origOnData(event); |
- |
- // The callback could have closed the handle. If so, don't add it back to |
- // the MojoHandleWatcher. |
- if (_handle.isValid) { |
- assert(!_eventHandlerAdded); |
- var res = MojoHandleWatcher.add(_handle, _sendPort, _signals); |
- if (!res.isOk) { |
- throw new Exception("Failed to re-add handle: $res"); |
- } |
- _eventHandlerAdded = true; |
- } |
- }); |
- } |
- |
- StreamSubscription<int> listen( |
- void onData(int event), |
+ StreamSubscription<List<int>> listen( |
+ void onData(List event), |
{Function onError, void onDone(), bool cancelOnError}) { |
+ if (_isListening) { |
+ throw "Listen has already been called: $_handle."; |
+ } |
_receivePort = new ReceivePort(); |
_sendPort = _receivePort.sendPort; |
_controller = new StreamController(sync: true, |
@@ -156,41 +133,34 @@ class MojoHandle extends Stream<int> { |
onResume: _onPauseStateChange); |
_controller.addStream(_receivePort); |
- assert(!_eventHandlerAdded); |
- var res = MojoHandleWatcher.add(_handle, _sendPort, _signals); |
- if (!res.isOk) { |
- throw new Exception("MojoHandleWatcher add failed: $res"); |
+ if (_signals != MojoHandleSignals.NONE) { |
+ var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value); |
+ if (!res.isOk) { |
+ throw "MojoHandleWatcher add failed: $res"; |
+ } |
} |
- _eventHandlerAdded = true; |
+ _isListening = true; |
return _controller.stream.listen( |
- _onDataClosure(onData), |
+ onData, |
onError: onError, |
onDone: onDone, |
cancelOnError: cancelOnError); |
} |
- bool writeEnabled() => MojoHandleSignals.isWritable(_signals); |
- |
- void toggleWriteEvents() { |
- _signals = MojoHandleSignals.toggleWrite(_signals); |
- if (_eventHandlerAdded) { |
- var res = MojoHandleWatcher.toggleWrite(_handle); |
+ void enableSignals(MojoHandleSignals signals) { |
+ _signals = signals; |
+ if (_isListening) { |
+ var res = MojoHandleWatcher.add(_handle, _sendPort, signals.value); |
if (!res.isOk) { |
- throw new Exception("MojoHandleWatcher failed to toggle write: $res"); |
+ throw "MojoHandleWatcher add failed: $res"; |
} |
} |
} |
- void enableWriteEvents() { |
- assert(!writeEnabled()); |
- toggleWriteEvents(); |
- } |
- |
- void disableWriteEvents() { |
- assert(writeEnabled()); |
- toggleWriteEvents(); |
- } |
+ void enableReadEvents() => enableSignals(MojoHandleSignals.READABLE); |
+ void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); |
+ void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); |
void _onSubscriptionStateChange() { |
if (!_controller.hasListener) { |
@@ -200,23 +170,20 @@ class MojoHandle extends Stream<int> { |
void _onPauseStateChange() { |
if (_controller.isPaused) { |
- if (_eventHandlerAdded) { |
- var res = MojoHandleWatcher.remove(_handle); |
- if (!res.isOk) { |
- throw new Exception("MojoHandleWatcher add failed: $res"); |
- } |
- _eventHandlerAdded = false; |
+ var res = MojoHandleWatcher.remove(_handle); |
+ if (!res.isOk) { |
+ throw "MojoHandleWatcher add failed: $res"; |
} |
} else { |
- if (!_eventHandlerAdded) { |
- var res = MojoHandleWatcher.add(_handle, _sendPort, _signals); |
- if (!res.isOk) { |
- throw new Exception("MojoHandleWatcher add failed: $res"); |
- } |
- _eventHandlerAdded = true; |
+ var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value); |
+ if (!res.isOk) { |
+ throw "MojoHandleWatcher add failed: $res"; |
} |
} |
} |
+ bool get readyRead => _handle.readyRead; |
+ bool get readyWrite => _handle.readyWrite; |
+ |
String toString() => "$_handle"; |
} |