| Index: mojo/public/dart/src/handle.dart
|
| diff --git a/mojo/public/dart/src/handle.dart b/mojo/public/dart/src/handle.dart
|
| index 56b673548615fd854e93f6bfa6644e8f00fa0039..f983815504f0c45b5598f198a1a21bf6c2aeb48d 100644
|
| --- a/mojo/public/dart/src/handle.dart
|
| +++ b/mojo/public/dart/src/handle.dart
|
| @@ -5,7 +5,7 @@
|
| part of core;
|
|
|
| class _MojoHandleNatives {
|
| - static int register(MojoHandle handle) native "MojoHandle_Register";
|
| + static int register(MojoEventStream eventStream) native "MojoHandle_Register";
|
| static int close(int handle) native "MojoHandle_Close";
|
| static List wait(int handle, int signals, int deadline)
|
| native "MojoHandle_Wait";
|
| @@ -15,13 +15,13 @@ class _MojoHandleNatives {
|
| }
|
|
|
|
|
| -class RawMojoHandle {
|
| +class MojoHandle {
|
| static const int INVALID = 0;
|
| static const int DEADLINE_INDEFINITE = -1;
|
|
|
| int h;
|
|
|
| - RawMojoHandle(this.h);
|
| + MojoHandle(this.h);
|
|
|
| MojoResult close() {
|
| int result = _MojoHandleNatives.close(h);
|
| @@ -34,9 +34,9 @@ class RawMojoHandle {
|
| return new MojoWaitResult(new MojoResult(result[0]), result[1]);
|
| }
|
|
|
| - bool _ready(int signal) {
|
| - MojoWaitResult res = wait(signal, 0);
|
| - switch (res.result) {
|
| + bool _ready(MojoHandleSignals signal) {
|
| + MojoWaitResult mwr = wait(signal.value, 0);
|
| + switch (mwr.result) {
|
| case MojoResult.OK:
|
| return true;
|
| case MojoResult.DEADLINE_EXCEEDED:
|
| @@ -46,40 +46,37 @@ class RawMojoHandle {
|
| return false;
|
| default:
|
| // Should be unreachable.
|
| - throw new Exception("Unreachable");
|
| + throw "Unexpected result $res for wait on $h";
|
| }
|
| }
|
|
|
| - bool readyRead() => _ready(MojoHandleSignals.READABLE);
|
| - bool readyWrite() => _ready(MojoHandleSignals.WRITABLE);
|
| + bool get readyRead => _ready(MojoHandleSignals.READABLE);
|
| + bool get readyWrite => _ready(MojoHandleSignals.WRITABLE);
|
|
|
| - static MojoWaitManyResult waitMany(List<int> handles,
|
| - List<int> signals,
|
| - int deadline) {
|
| - List result = _MojoHandleNatives.waitMany(
|
| - handles, signals, deadline);
|
| -
|
| + static MojoWaitManyResult waitMany(
|
| + List<int> handles, List<int> signals, int deadline) {
|
| + List result = _MojoHandleNatives.waitMany(handles, signals, deadline);
|
| return new MojoWaitManyResult(
|
| new MojoResult(result[0]), result[1], result[2]);
|
| }
|
|
|
| - static MojoResult register(MojoHandle handle) {
|
| - return new MojoResult(_MojoHandleNatives.register(handle));
|
| + static MojoResult register(MojoEventStream eventStream) {
|
| + return new MojoResult(_MojoHandleNatives.register(eventStream));
|
| }
|
|
|
| bool get isValid => (h != INVALID);
|
|
|
| String toString() => "$h";
|
|
|
| - bool operator ==(RawMojoHandle other) {
|
| + bool operator ==(MojoHandle other) {
|
| return h == other.h;
|
| }
|
| }
|
|
|
|
|
| -class MojoHandle extends Stream<int> {
|
| +class MojoEventStream extends Stream<int> {
|
| // The underlying Mojo handle.
|
| - RawMojoHandle _handle;
|
| + MojoHandle _handle;
|
|
|
| // Providing our own stream controller allows us to take custom actions when
|
| // listeners pause/resume/etc. their StreamSubscription.
|
| @@ -94,59 +91,39 @@ class MojoHandle extends Stream<int> {
|
| ReceivePort _receivePort;
|
|
|
| // The signals on this handle that we're interested in.
|
| - int _signals;
|
| + MojoHandleSignals _signals;
|
|
|
| - // Whether the handle has been added to the handle watcher.
|
| - bool _eventHandlerAdded;
|
| + // Whether listen has been called.
|
| + bool _isListening;
|
|
|
| - MojoHandle(this._handle) :
|
| - _signals = MojoHandleSignals.READABLE,
|
| - _eventHandlerAdded = false {
|
| - MojoResult result = RawMojoHandle.register(this);
|
| + MojoEventStream(MojoHandle handle,
|
| + [MojoHandleSignals signals = MojoHandleSignals.READABLE]) :
|
| + _handle = handle,
|
| + _signals = signals,
|
| + _isListening = false {
|
| + MojoResult result = MojoHandle.register(this);
|
| if (!result.isOk) {
|
| - throw new Exception("Failed to register the MojoHandle");
|
| + throw "Failed to register the MojoHandle: $result.";
|
| }
|
| }
|
|
|
| void close() {
|
| - if (_eventHandlerAdded) {
|
| + if (_handle != null) {
|
| MojoHandleWatcher.close(_handle);
|
| - _eventHandlerAdded = false;
|
| - } else {
|
| - // If we're not in the handle watcher, then close the handle manually.
|
| - _handle.close();
|
| + _handle = null;
|
| }
|
| if (_receivePort != null) {
|
| _receivePort.close();
|
| + _receivePort = null;
|
| }
|
| }
|
|
|
| - // We wrap the callback provided by clients in listen() with some code to
|
| - // handle adding and removing the handle to/from the handle watcher. Because
|
| - // the handle watcher removes this handle whenever it receives an event,
|
| - // we have to re-add it when the callback is finished.
|
| - Function _onDataClosure(origOnData) {
|
| - return ((int event) {
|
| - // The handle watcher removes this handle from its set on an event.
|
| - _eventHandlerAdded = false;
|
| - origOnData(event);
|
| -
|
| - // The callback could have closed the handle. If so, don't add it back to
|
| - // the MojoHandleWatcher.
|
| - if (_handle.isValid) {
|
| - assert(!_eventHandlerAdded);
|
| - var res = MojoHandleWatcher.add(_handle, _sendPort, _signals);
|
| - if (!res.isOk) {
|
| - throw new Exception("Failed to re-add handle: $res");
|
| - }
|
| - _eventHandlerAdded = true;
|
| - }
|
| - });
|
| - }
|
| -
|
| - StreamSubscription<int> listen(
|
| - void onData(int event),
|
| + StreamSubscription<List<int>> listen(
|
| + void onData(List event),
|
| {Function onError, void onDone(), bool cancelOnError}) {
|
| + if (_isListening) {
|
| + throw "Listen has already been called: $_handle.";
|
| + }
|
| _receivePort = new ReceivePort();
|
| _sendPort = _receivePort.sendPort;
|
| _controller = new StreamController(sync: true,
|
| @@ -156,41 +133,34 @@ class MojoHandle extends Stream<int> {
|
| onResume: _onPauseStateChange);
|
| _controller.addStream(_receivePort);
|
|
|
| - assert(!_eventHandlerAdded);
|
| - var res = MojoHandleWatcher.add(_handle, _sendPort, _signals);
|
| - if (!res.isOk) {
|
| - throw new Exception("MojoHandleWatcher add failed: $res");
|
| + if (_signals != MojoHandleSignals.NONE) {
|
| + var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value);
|
| + if (!res.isOk) {
|
| + throw "MojoHandleWatcher add failed: $res";
|
| + }
|
| }
|
| - _eventHandlerAdded = true;
|
|
|
| + _isListening = true;
|
| return _controller.stream.listen(
|
| - _onDataClosure(onData),
|
| + onData,
|
| onError: onError,
|
| onDone: onDone,
|
| cancelOnError: cancelOnError);
|
| }
|
|
|
| - bool writeEnabled() => MojoHandleSignals.isWritable(_signals);
|
| -
|
| - void toggleWriteEvents() {
|
| - _signals = MojoHandleSignals.toggleWrite(_signals);
|
| - if (_eventHandlerAdded) {
|
| - var res = MojoHandleWatcher.toggleWrite(_handle);
|
| + void enableSignals(MojoHandleSignals signals) {
|
| + _signals = signals;
|
| + if (_isListening) {
|
| + var res = MojoHandleWatcher.add(_handle, _sendPort, signals.value);
|
| if (!res.isOk) {
|
| - throw new Exception("MojoHandleWatcher failed to toggle write: $res");
|
| + throw "MojoHandleWatcher add failed: $res";
|
| }
|
| }
|
| }
|
|
|
| - void enableWriteEvents() {
|
| - assert(!writeEnabled());
|
| - toggleWriteEvents();
|
| - }
|
| -
|
| - void disableWriteEvents() {
|
| - assert(writeEnabled());
|
| - toggleWriteEvents();
|
| - }
|
| + void enableReadEvents() => enableSignals(MojoHandleSignals.READABLE);
|
| + void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
|
| + void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
|
|
|
| void _onSubscriptionStateChange() {
|
| if (!_controller.hasListener) {
|
| @@ -200,23 +170,20 @@ class MojoHandle extends Stream<int> {
|
|
|
| void _onPauseStateChange() {
|
| if (_controller.isPaused) {
|
| - if (_eventHandlerAdded) {
|
| - var res = MojoHandleWatcher.remove(_handle);
|
| - if (!res.isOk) {
|
| - throw new Exception("MojoHandleWatcher add failed: $res");
|
| - }
|
| - _eventHandlerAdded = false;
|
| + var res = MojoHandleWatcher.remove(_handle);
|
| + if (!res.isOk) {
|
| + throw "MojoHandleWatcher add failed: $res";
|
| }
|
| } else {
|
| - if (!_eventHandlerAdded) {
|
| - var res = MojoHandleWatcher.add(_handle, _sendPort, _signals);
|
| - if (!res.isOk) {
|
| - throw new Exception("MojoHandleWatcher add failed: $res");
|
| - }
|
| - _eventHandlerAdded = true;
|
| + var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value);
|
| + if (!res.isOk) {
|
| + throw "MojoHandleWatcher add failed: $res";
|
| }
|
| }
|
| }
|
|
|
| + bool get readyRead => _handle.readyRead;
|
| + bool get readyWrite => _handle.readyWrite;
|
| +
|
| String toString() => "$_handle";
|
| }
|
|
|