Index: mojo/public/dart/mojo/lib/src/event_stream.dart |
diff --git a/mojo/public/dart/mojo/lib/src/event_stream.dart b/mojo/public/dart/mojo/lib/src/event_stream.dart |
index 0ff091ee571c34f31e256879970b417ce9fa797d..33eedf42995e3c1cbd4f465f7d8428a3c9e5eca5 100644 |
--- a/mojo/public/dart/mojo/lib/src/event_stream.dart |
+++ b/mojo/public/dart/mojo/lib/src/event_stream.dart |
@@ -4,21 +4,17 @@ |
part of core; |
-class MojoEventStream extends Stream<List<int>> { |
+class MojoEventHandler { |
Cutch
2015/11/11 17:47:17
This class doesn't handle events. It wraps a subsc
zra
2015/11/11 18:44:33
Renamed MojoEventSubscription
|
// The underlying Mojo handle. |
MojoHandle _handle; |
- // Providing our own stream controller allows us to take custom actions when |
- // listeners pause/resume/etc. their StreamSubscription. |
- StreamController _controller; |
- |
// The send port that we give to the handle watcher to notify us of handle |
// events. |
SendPort _sendPort; |
// The receive port on which we listen and receive events from the handle |
// watcher. |
- ReceivePort _receivePort; |
+ RawReceivePort _receivePort; |
// The signals on this handle that we're interested in. |
MojoHandleSignals _signals; |
@@ -26,7 +22,7 @@ class MojoEventStream extends Stream<List<int>> { |
// Whether listen has been called. |
bool _isListening; |
- MojoEventStream(MojoHandle handle, |
+ MojoEventHandler(MojoHandle handle, |
[MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) |
: _handle = handle, |
_signals = signals, |
@@ -40,30 +36,21 @@ class MojoEventStream extends Stream<List<int>> { |
Future close({bool immediate: false}) => _close(immediate: immediate); |
- StreamSubscription<List<int>> listen(void onData(List event), |
- {Function onError, void onDone(), bool cancelOnError}) { |
+ void handleEvents(void router(List<int> event)) { |
Cutch
2015/11/11 17:47:17
I find this name confusing because handleEvents do
zra
2015/11/11 18:44:33
Renamed subscribe
|
if (_isListening) { |
throw new MojoApiError("Listen has already been called: $_handle."); |
} |
- _receivePort = new ReceivePort(); |
+ _receivePort = new RawReceivePort(router); |
_sendPort = _receivePort.sendPort; |
- _controller = new StreamController( |
- sync: true, |
- onPause: _onPauseStateChange, |
- onResume: _onPauseStateChange); |
- _controller.addStream(_receivePort).whenComplete(_controller.close); |
if (_signals != MojoHandleSignals.NONE) { |
- var res = new MojoResult( |
- MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value)); |
- if (!res.isOk) { |
+ int res = MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value); |
+ if (res != MojoResult.kOk) { |
throw new MojoInternalError("MojoHandleWatcher add failed: $res"); |
} |
} |
_isListening = true; |
- return _controller.stream.listen(onData, |
- onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
} |
bool enableSignals(MojoHandleSignals signals) { |
@@ -117,59 +104,44 @@ class MojoEventStream extends Stream<List<int>> { |
} |
} |
- void _onPauseStateChange() { |
- if (_controller.isPaused) { |
- var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); |
- if (!res.isOk) { |
- throw new MojoInternalError("MojoHandleWatcher add failed: $res"); |
- } |
- } else { |
- var res = new MojoResult( |
- MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value)); |
- if (!res.isOk) { |
- throw new MojoInternalError("MojoHandleWatcher add failed: $res"); |
- } |
- } |
- } |
- |
bool get readyRead => _handle.readyRead; |
bool get readyWrite => _handle.readyWrite; |
+ MojoHandleSignals get signals => _signals; |
String toString() => "$_handle"; |
} |
-typedef void ErrorHandler(); |
+typedef void ErrorHandler(Object e); |
-class MojoEventStreamListener { |
- StreamSubscription subscription; |
+class MojoEventController { |
ErrorHandler onError; |
MojoMessagePipeEndpoint _endpoint; |
- MojoEventStream _eventStream; |
+ MojoEventHandler _eventHandler; |
bool _isOpen = false; |
bool _isInHandler = false; |
bool _isPeerClosed = false; |
- MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint) |
+ MojoEventController.fromEndpoint(MojoMessagePipeEndpoint endpoint) |
: _endpoint = endpoint, |
- _eventStream = new MojoEventStream(endpoint.handle) { |
- listen(); |
+ _eventHandler = new MojoEventHandler(endpoint.handle) { |
+ handleEvents(); |
} |
- MojoEventStreamListener.fromHandle(MojoHandle handle) { |
+ MojoEventController.fromHandle(MojoHandle handle) { |
_endpoint = new MojoMessagePipeEndpoint(handle); |
- _eventStream = new MojoEventStream(handle); |
- listen(); |
+ _eventHandler = new MojoEventHandler(handle); |
+ handleEvents(); |
} |
- MojoEventStreamListener.unbound(); |
+ MojoEventController.unbound(); |
void bind(MojoMessagePipeEndpoint endpoint) { |
if (isBound) { |
throw new MojoApiError("MojoEventStreamListener is already bound."); |
} |
_endpoint = endpoint; |
- _eventStream = new MojoEventStream(endpoint.handle); |
+ _eventHandler = new MojoEventHandler(endpoint.handle); |
_isOpen = false; |
_isInHandler = false; |
_isPeerClosed = false; |
@@ -180,67 +152,74 @@ class MojoEventStreamListener { |
throw new MojoApiError("MojoEventStreamListener is already bound."); |
} |
_endpoint = new MojoMessagePipeEndpoint(handle); |
- _eventStream = new MojoEventStream(handle); |
+ _eventHandler = new MojoEventHandler(handle); |
_isOpen = false; |
_isInHandler = false; |
_isPeerClosed = false; |
} |
- StreamSubscription<List<int>> listen() { |
+ void handleEvents() { |
if (!isBound) { |
throw new MojoApiError("MojoEventStreamListener is unbound."); |
} |
- if (subscription != null) { |
- throw new MojoApiError("Listen has already been called."); |
- } |
_isOpen = true; |
- subscription = _eventStream.listen((List<int> event) { |
- if (!_isOpen) { |
- // The actual close of the underlying stream happens asynchronously |
- // after the call to close. However, we start to ignore incoming events |
- // immediately. |
- return; |
- } |
- var signalsWatched = new MojoHandleSignals(event[0]); |
- var signalsReceived = new MojoHandleSignals(event[1]); |
- _isInHandler = true; |
- if (signalsReceived.isReadable) { |
- assert(_eventStream.readyRead); |
- handleRead(); |
- } |
- if (signalsReceived.isWritable) { |
- assert(_eventStream.readyWrite); |
- handleWrite(); |
- } |
- _isPeerClosed = signalsReceived.isPeerClosed || |
- !_eventStream.enableSignals(signalsWatched); |
- _isInHandler = false; |
- if (_isPeerClosed) { |
- close().then((_) { |
+ _eventHandler.handleEvents((List<int> event) { |
+ try { |
+ _handleEvent(event); |
+ } catch (e) { |
+ close(immediate: true).then((_) { |
if (onError != null) { |
- onError(); |
+ onError(e); |
} |
}); |
} |
- }, onDone: close); |
- return subscription; |
+ }); |
} |
Future close({bool immediate: false}) { |
var result; |
_isOpen = false; |
_endpoint = null; |
- subscription = null; |
- if (_eventStream != null) { |
- result = _eventStream |
+ if (_eventHandler != null) { |
+ result = _eventHandler |
._close(immediate: immediate, local: _isPeerClosed) |
.then((_) { |
- _eventStream = null; |
+ _eventHandler = null; |
}); |
} |
return result != null ? result : new Future.value(null); |
} |
+ void _handleEvent(List<int> event) { |
+ if (!_isOpen) { |
+ // The actual close of the underlying stream happens asynchronously |
+ // after the call to close. However, we start to ignore incoming events |
+ // immediately. |
+ return; |
+ } |
+ var signalsWatched = new MojoHandleSignals(event[0]); |
+ var signalsReceived = new MojoHandleSignals(event[1]); |
+ _isInHandler = true; |
+ if (signalsReceived.isReadable) { |
+ assert(_eventHandler.readyRead); |
+ handleRead(); |
+ } |
+ if (signalsReceived.isWritable) { |
+ assert(_eventHandler.readyWrite); |
+ handleWrite(); |
+ } |
+ _isPeerClosed = signalsReceived.isPeerClosed || |
+ !_eventHandler.enableSignals(signalsWatched); |
+ _isInHandler = false; |
+ if (_isPeerClosed) { |
+ close().then((_) { |
+ if (onError != null) { |
+ onError(null); |
+ } |
+ }); |
+ } |
+ } |
+ |
void handleRead() {} |
Cutch
2015/11/11 17:47:17
Maybe this class should be called 'MojoEventHandle
zra
2015/11/11 18:44:33
Done.
|
void handleWrite() {} |
@@ -250,6 +229,6 @@ class MojoEventStreamListener { |
bool get isBound => _endpoint != null; |
bool get isPeerClosed => _isPeerClosed; |
- String toString() => "MojoEventStreamListener(" |
+ String toString() => "MojoEventController(" |
"isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; |
} |