Chromium Code Reviews| Index: mojo/public/dart/mojo/lib/src/event_stream.dart |
| diff --git a/mojo/public/dart/mojo/lib/src/event_stream.dart b/mojo/public/dart/mojo/lib/src/event_stream.dart |
| index 0ff091ee571c34f31e256879970b417ce9fa797d..5b816fa05603f4279fce54cd3e93cd626e9996a9 100644 |
| --- a/mojo/public/dart/mojo/lib/src/event_stream.dart |
| +++ b/mojo/public/dart/mojo/lib/src/event_stream.dart |
| @@ -4,33 +4,29 @@ |
| part of core; |
| -class MojoEventStream extends Stream<List<int>> { |
| +class MojoEventSubscription { |
|
Cutch
2015/11/11 19:08:16
Consider 'MojoHandleEventSubscription'
zra
2015/11/11 20:43:26
Since Handles are the only source of events, and s
|
| // The underlying Mojo handle. |
| MojoHandle _handle; |
| - // Providing our own stream controller allows us to take custom actions when |
| - // listeners pause/resume/etc. their StreamSubscription. |
| - StreamController _controller; |
| - |
| // The send port that we give to the handle watcher to notify us of handle |
| // events. |
| SendPort _sendPort; |
| // The receive port on which we listen and receive events from the handle |
| // watcher. |
| - ReceivePort _receivePort; |
| + RawReceivePort _receivePort; |
| // The signals on this handle that we're interested in. |
| MojoHandleSignals _signals; |
| // Whether listen has been called. |
|
Cutch
2015/11/11 19:08:15
s/listen/subscribe
zra
2015/11/11 20:43:26
Done.
|
| - bool _isListening; |
| + bool _isSubscribed; |
| - MojoEventStream(MojoHandle handle, |
| + MojoEventSubscription(MojoHandle handle, |
| [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) |
| : _handle = handle, |
| _signals = signals, |
| - _isListening = false { |
| + _isSubscribed = false { |
| MojoResult result = MojoHandle.registerFinalizer(this); |
| if (!result.isOk) { |
| throw new MojoInternalError( |
| @@ -40,35 +36,26 @@ class MojoEventStream extends Stream<List<int>> { |
| Future close({bool immediate: false}) => _close(immediate: immediate); |
| - StreamSubscription<List<int>> listen(void onData(List event), |
| - {Function onError, void onDone(), bool cancelOnError}) { |
| - if (_isListening) { |
| - throw new MojoApiError("Listen has already been called: $_handle."); |
| + void subscribe(void handler(List<int> event)) { |
| + if (_isSubscribed) { |
| + throw new MojoApiError("subscribe() has already been called: $this."); |
| } |
| - _receivePort = new ReceivePort(); |
| + _receivePort = new RawReceivePort(handler); |
| _sendPort = _receivePort.sendPort; |
| - _controller = new StreamController( |
| - sync: true, |
| - onPause: _onPauseStateChange, |
| - onResume: _onPauseStateChange); |
| - _controller.addStream(_receivePort).whenComplete(_controller.close); |
| if (_signals != MojoHandleSignals.NONE) { |
| - var res = new MojoResult( |
| - MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value)); |
| - if (!res.isOk) { |
| + int res = MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value); |
| + if (res != MojoResult.kOk) { |
| throw new MojoInternalError("MojoHandleWatcher add failed: $res"); |
| } |
| } |
| - _isListening = true; |
| - return _controller.stream.listen(onData, |
| - onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| + _isSubscribed = true; |
| } |
| bool enableSignals(MojoHandleSignals signals) { |
| _signals = signals; |
| - if (_isListening) { |
| + if (_isSubscribed) { |
| return MojoHandleWatcher.add(_handle.h, _sendPort, signals.value) == |
| MojoResult.kOk; |
| } |
| @@ -82,7 +69,7 @@ class MojoEventStream extends Stream<List<int>> { |
| Future _close({bool immediate: false, bool local: false}) { |
| if (_handle != null) { |
| - if (_isListening && !local) { |
| + if (_isSubscribed && !local) { |
| return _handleWatcherClose(immediate: immediate).then((result) { |
| // If the handle watcher is gone, then close the handle ourselves. |
| if (!result.isOk) { |
| @@ -117,59 +104,44 @@ class MojoEventStream extends Stream<List<int>> { |
| } |
| } |
| - void _onPauseStateChange() { |
| - if (_controller.isPaused) { |
| - var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); |
| - if (!res.isOk) { |
| - throw new MojoInternalError("MojoHandleWatcher add failed: $res"); |
| - } |
| - } else { |
| - var res = new MojoResult( |
| - MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value)); |
| - if (!res.isOk) { |
| - throw new MojoInternalError("MojoHandleWatcher add failed: $res"); |
| - } |
| - } |
| - } |
| - |
| bool get readyRead => _handle.readyRead; |
| bool get readyWrite => _handle.readyWrite; |
| + MojoHandleSignals get signals => _signals; |
| String toString() => "$_handle"; |
| } |
| -typedef void ErrorHandler(); |
| +typedef void ErrorHandler(Object e); |
| -class MojoEventStreamListener { |
| - StreamSubscription subscription; |
| +class MojoEventHandler { |
|
Cutch
2015/11/11 19:08:16
Consider 'MojoHandleEventHandler'
zra
2015/11/11 20:43:26
Like above, my feeling is that this is too verbose
|
| ErrorHandler onError; |
| MojoMessagePipeEndpoint _endpoint; |
| - MojoEventStream _eventStream; |
| + MojoEventSubscription _eventSubscription; |
| bool _isOpen = false; |
| bool _isInHandler = false; |
| bool _isPeerClosed = false; |
| - MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint) |
| + MojoEventHandler.fromEndpoint(MojoMessagePipeEndpoint endpoint) |
| : _endpoint = endpoint, |
| - _eventStream = new MojoEventStream(endpoint.handle) { |
| - listen(); |
| + _eventSubscription = new MojoEventSubscription(endpoint.handle) { |
| + handleEvents(); |
| } |
| - MojoEventStreamListener.fromHandle(MojoHandle handle) { |
| + MojoEventHandler.fromHandle(MojoHandle handle) { |
| _endpoint = new MojoMessagePipeEndpoint(handle); |
| - _eventStream = new MojoEventStream(handle); |
| - listen(); |
| + _eventSubscription = new MojoEventSubscription(handle); |
| + handleEvents(); |
| } |
| - MojoEventStreamListener.unbound(); |
| + MojoEventHandler.unbound(); |
| void bind(MojoMessagePipeEndpoint endpoint) { |
| if (isBound) { |
| throw new MojoApiError("MojoEventStreamListener is already bound."); |
| } |
| _endpoint = endpoint; |
| - _eventStream = new MojoEventStream(endpoint.handle); |
| + _eventSubscription = new MojoEventSubscription(endpoint.handle); |
| _isOpen = false; |
| _isInHandler = false; |
| _isPeerClosed = false; |
| @@ -180,67 +152,74 @@ class MojoEventStreamListener { |
| throw new MojoApiError("MojoEventStreamListener is already bound."); |
| } |
| _endpoint = new MojoMessagePipeEndpoint(handle); |
| - _eventStream = new MojoEventStream(handle); |
| + _eventSubscription = new MojoEventSubscription(handle); |
| _isOpen = false; |
| _isInHandler = false; |
| _isPeerClosed = false; |
| } |
| - StreamSubscription<List<int>> listen() { |
| + void handleEvents() { |
|
Cutch
2015/11/11 19:08:16
handleEvents -> beginHandlingEvents()
zra
2015/11/11 20:43:26
Done.
|
| if (!isBound) { |
| throw new MojoApiError("MojoEventStreamListener is unbound."); |
| } |
| - if (subscription != null) { |
| - throw new MojoApiError("Listen has already been called."); |
| - } |
| _isOpen = true; |
| - subscription = _eventStream.listen((List<int> event) { |
| - if (!_isOpen) { |
| - // The actual close of the underlying stream happens asynchronously |
| - // after the call to close. However, we start to ignore incoming events |
| - // immediately. |
| - return; |
| - } |
| - var signalsWatched = new MojoHandleSignals(event[0]); |
| - var signalsReceived = new MojoHandleSignals(event[1]); |
| - _isInHandler = true; |
| - if (signalsReceived.isReadable) { |
| - assert(_eventStream.readyRead); |
| - handleRead(); |
| - } |
| - if (signalsReceived.isWritable) { |
| - assert(_eventStream.readyWrite); |
| - handleWrite(); |
| - } |
| - _isPeerClosed = signalsReceived.isPeerClosed || |
| - !_eventStream.enableSignals(signalsWatched); |
| - _isInHandler = false; |
| - if (_isPeerClosed) { |
| - close().then((_) { |
| + _eventSubscription.subscribe((List<int> event) { |
| + try { |
| + _handleEvent(event); |
| + } catch (e) { |
| + close(immediate: true).then((_) { |
| if (onError != null) { |
| - onError(); |
| + onError(e); |
| } |
| }); |
| } |
| - }, onDone: close); |
| - return subscription; |
| + }); |
| } |
| Future close({bool immediate: false}) { |
| var result; |
| _isOpen = false; |
| _endpoint = null; |
| - subscription = null; |
| - if (_eventStream != null) { |
| - result = _eventStream |
| + if (_eventSubscription != null) { |
| + result = _eventSubscription |
| ._close(immediate: immediate, local: _isPeerClosed) |
| .then((_) { |
| - _eventStream = null; |
| + _eventSubscription = null; |
| }); |
| } |
| return result != null ? result : new Future.value(null); |
| } |
| + void _handleEvent(List<int> event) { |
| + if (!_isOpen) { |
| + // The actual close of the underlying stream happens asynchronously |
| + // after the call to close. However, we start to ignore incoming events |
| + // immediately. |
| + return; |
| + } |
| + var signalsWatched = new MojoHandleSignals(event[0]); |
| + var signalsReceived = new MojoHandleSignals(event[1]); |
| + _isInHandler = true; |
| + if (signalsReceived.isReadable) { |
| + assert(_eventSubscription.readyRead); |
| + handleRead(); |
| + } |
| + if (signalsReceived.isWritable) { |
| + assert(_eventSubscription.readyWrite); |
| + handleWrite(); |
| + } |
| + _isPeerClosed = signalsReceived.isPeerClosed || |
| + !_eventSubscription.enableSignals(signalsWatched); |
| + _isInHandler = false; |
| + if (_isPeerClosed) { |
| + close().then((_) { |
| + if (onError != null) { |
| + onError(null); |
| + } |
| + }); |
| + } |
| + } |
| + |
| void handleRead() {} |
| void handleWrite() {} |
| @@ -250,6 +229,6 @@ class MojoEventStreamListener { |
| bool get isBound => _endpoint != null; |
| bool get isPeerClosed => _isPeerClosed; |
| - String toString() => "MojoEventStreamListener(" |
| + String toString() => "MojoEventHandler(" |
| "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; |
| } |