| Index: mojo/public/dart/mojo/lib/src/event_stream.dart
|
| diff --git a/mojo/public/dart/mojo/lib/src/event_stream.dart b/mojo/public/dart/mojo/lib/src/event_stream.dart
|
| index 0ff091ee571c34f31e256879970b417ce9fa797d..ceb0e0ea0d5042b54cec6f03c9f05105caf56bec 100644
|
| --- a/mojo/public/dart/mojo/lib/src/event_stream.dart
|
| +++ b/mojo/public/dart/mojo/lib/src/event_stream.dart
|
| @@ -4,33 +4,29 @@
|
|
|
| part of core;
|
|
|
| -class MojoEventStream extends Stream<List<int>> {
|
| +class MojoEventSubscription {
|
| // The underlying Mojo handle.
|
| MojoHandle _handle;
|
|
|
| - // Providing our own stream controller allows us to take custom actions when
|
| - // listeners pause/resume/etc. their StreamSubscription.
|
| - StreamController _controller;
|
| -
|
| // The send port that we give to the handle watcher to notify us of handle
|
| // events.
|
| SendPort _sendPort;
|
|
|
| // The receive port on which we listen and receive events from the handle
|
| // watcher.
|
| - ReceivePort _receivePort;
|
| + RawReceivePort _receivePort;
|
|
|
| // The signals on this handle that we're interested in.
|
| MojoHandleSignals _signals;
|
|
|
| - // Whether listen has been called.
|
| - bool _isListening;
|
| + // Whether subscribe() has been called.
|
| + bool _isSubscribed;
|
|
|
| - MojoEventStream(MojoHandle handle,
|
| + MojoEventSubscription(MojoHandle handle,
|
| [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
|
| : _handle = handle,
|
| _signals = signals,
|
| - _isListening = false {
|
| + _isSubscribed = false {
|
| MojoResult result = MojoHandle.registerFinalizer(this);
|
| if (!result.isOk) {
|
| throw new MojoInternalError(
|
| @@ -40,35 +36,26 @@ class MojoEventStream extends Stream<List<int>> {
|
|
|
| Future close({bool immediate: false}) => _close(immediate: immediate);
|
|
|
| - StreamSubscription<List<int>> listen(void onData(List event),
|
| - {Function onError, void onDone(), bool cancelOnError}) {
|
| - if (_isListening) {
|
| - throw new MojoApiError("Listen has already been called: $_handle.");
|
| + void subscribe(void handler(List<int> event)) {
|
| + if (_isSubscribed) {
|
| + throw new MojoApiError("subscribe() has already been called: $this.");
|
| }
|
| - _receivePort = new ReceivePort();
|
| + _receivePort = new RawReceivePort(handler);
|
| _sendPort = _receivePort.sendPort;
|
| - _controller = new StreamController(
|
| - sync: true,
|
| - onPause: _onPauseStateChange,
|
| - onResume: _onPauseStateChange);
|
| - _controller.addStream(_receivePort).whenComplete(_controller.close);
|
|
|
| if (_signals != MojoHandleSignals.NONE) {
|
| - var res = new MojoResult(
|
| - MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
|
| - if (!res.isOk) {
|
| + int res = MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value);
|
| + if (res != MojoResult.kOk) {
|
| throw new MojoInternalError("MojoHandleWatcher add failed: $res");
|
| }
|
| }
|
|
|
| - _isListening = true;
|
| - return _controller.stream.listen(onData,
|
| - onError: onError, onDone: onDone, cancelOnError: cancelOnError);
|
| + _isSubscribed = true;
|
| }
|
|
|
| bool enableSignals(MojoHandleSignals signals) {
|
| _signals = signals;
|
| - if (_isListening) {
|
| + if (_isSubscribed) {
|
| return MojoHandleWatcher.add(_handle.h, _sendPort, signals.value) ==
|
| MojoResult.kOk;
|
| }
|
| @@ -82,7 +69,7 @@ class MojoEventStream extends Stream<List<int>> {
|
|
|
| Future _close({bool immediate: false, bool local: false}) {
|
| if (_handle != null) {
|
| - if (_isListening && !local) {
|
| + if (_isSubscribed && !local) {
|
| return _handleWatcherClose(immediate: immediate).then((result) {
|
| // If the handle watcher is gone, then close the handle ourselves.
|
| if (!result.isOk) {
|
| @@ -117,59 +104,44 @@ class MojoEventStream extends Stream<List<int>> {
|
| }
|
| }
|
|
|
| - void _onPauseStateChange() {
|
| - if (_controller.isPaused) {
|
| - var res = new MojoResult(MojoHandleWatcher.remove(_handle.h));
|
| - if (!res.isOk) {
|
| - throw new MojoInternalError("MojoHandleWatcher add failed: $res");
|
| - }
|
| - } else {
|
| - var res = new MojoResult(
|
| - MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
|
| - if (!res.isOk) {
|
| - throw new MojoInternalError("MojoHandleWatcher add failed: $res");
|
| - }
|
| - }
|
| - }
|
| -
|
| bool get readyRead => _handle.readyRead;
|
| bool get readyWrite => _handle.readyWrite;
|
| + MojoHandleSignals get signals => _signals;
|
|
|
| String toString() => "$_handle";
|
| }
|
|
|
| -typedef void ErrorHandler();
|
| +typedef void ErrorHandler(Object e);
|
|
|
| -class MojoEventStreamListener {
|
| - StreamSubscription subscription;
|
| +class MojoEventHandler {
|
| ErrorHandler onError;
|
|
|
| MojoMessagePipeEndpoint _endpoint;
|
| - MojoEventStream _eventStream;
|
| + MojoEventSubscription _eventSubscription;
|
| bool _isOpen = false;
|
| bool _isInHandler = false;
|
| bool _isPeerClosed = false;
|
|
|
| - MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint)
|
| + MojoEventHandler.fromEndpoint(MojoMessagePipeEndpoint endpoint)
|
| : _endpoint = endpoint,
|
| - _eventStream = new MojoEventStream(endpoint.handle) {
|
| - listen();
|
| + _eventSubscription = new MojoEventSubscription(endpoint.handle) {
|
| + beginHandlingEvents();
|
| }
|
|
|
| - MojoEventStreamListener.fromHandle(MojoHandle handle) {
|
| + MojoEventHandler.fromHandle(MojoHandle handle) {
|
| _endpoint = new MojoMessagePipeEndpoint(handle);
|
| - _eventStream = new MojoEventStream(handle);
|
| - listen();
|
| + _eventSubscription = new MojoEventSubscription(handle);
|
| + beginHandlingEvents();
|
| }
|
|
|
| - MojoEventStreamListener.unbound();
|
| + MojoEventHandler.unbound();
|
|
|
| void bind(MojoMessagePipeEndpoint endpoint) {
|
| if (isBound) {
|
| throw new MojoApiError("MojoEventStreamListener is already bound.");
|
| }
|
| _endpoint = endpoint;
|
| - _eventStream = new MojoEventStream(endpoint.handle);
|
| + _eventSubscription = new MojoEventSubscription(endpoint.handle);
|
| _isOpen = false;
|
| _isInHandler = false;
|
| _isPeerClosed = false;
|
| @@ -180,67 +152,74 @@ class MojoEventStreamListener {
|
| throw new MojoApiError("MojoEventStreamListener is already bound.");
|
| }
|
| _endpoint = new MojoMessagePipeEndpoint(handle);
|
| - _eventStream = new MojoEventStream(handle);
|
| + _eventSubscription = new MojoEventSubscription(handle);
|
| _isOpen = false;
|
| _isInHandler = false;
|
| _isPeerClosed = false;
|
| }
|
|
|
| - StreamSubscription<List<int>> listen() {
|
| + void beginHandlingEvents() {
|
| if (!isBound) {
|
| - throw new MojoApiError("MojoEventStreamListener is unbound.");
|
| - }
|
| - if (subscription != null) {
|
| - throw new MojoApiError("Listen has already been called.");
|
| + throw new MojoApiError("MojoEventHandler is unbound.");
|
| }
|
| _isOpen = true;
|
| - subscription = _eventStream.listen((List<int> event) {
|
| - if (!_isOpen) {
|
| - // The actual close of the underlying stream happens asynchronously
|
| - // after the call to close. However, we start to ignore incoming events
|
| - // immediately.
|
| - return;
|
| - }
|
| - var signalsWatched = new MojoHandleSignals(event[0]);
|
| - var signalsReceived = new MojoHandleSignals(event[1]);
|
| - _isInHandler = true;
|
| - if (signalsReceived.isReadable) {
|
| - assert(_eventStream.readyRead);
|
| - handleRead();
|
| - }
|
| - if (signalsReceived.isWritable) {
|
| - assert(_eventStream.readyWrite);
|
| - handleWrite();
|
| - }
|
| - _isPeerClosed = signalsReceived.isPeerClosed ||
|
| - !_eventStream.enableSignals(signalsWatched);
|
| - _isInHandler = false;
|
| - if (_isPeerClosed) {
|
| - close().then((_) {
|
| + _eventSubscription.subscribe((List<int> event) {
|
| + try {
|
| + _handleEvent(event);
|
| + } catch (e) {
|
| + close(immediate: true).then((_) {
|
| if (onError != null) {
|
| - onError();
|
| + onError(e);
|
| }
|
| });
|
| }
|
| - }, onDone: close);
|
| - return subscription;
|
| + });
|
| }
|
|
|
| Future close({bool immediate: false}) {
|
| var result;
|
| _isOpen = false;
|
| _endpoint = null;
|
| - subscription = null;
|
| - if (_eventStream != null) {
|
| - result = _eventStream
|
| + if (_eventSubscription != null) {
|
| + result = _eventSubscription
|
| ._close(immediate: immediate, local: _isPeerClosed)
|
| .then((_) {
|
| - _eventStream = null;
|
| + _eventSubscription = null;
|
| });
|
| }
|
| return result != null ? result : new Future.value(null);
|
| }
|
|
|
| + void _handleEvent(List<int> event) {
|
| + if (!_isOpen) {
|
| + // The actual close of the underlying stream happens asynchronously
|
| + // after the call to close. However, we start to ignore incoming events
|
| + // immediately.
|
| + return;
|
| + }
|
| + var signalsWatched = new MojoHandleSignals(event[0]);
|
| + var signalsReceived = new MojoHandleSignals(event[1]);
|
| + _isInHandler = true;
|
| + if (signalsReceived.isReadable) {
|
| + assert(_eventSubscription.readyRead);
|
| + handleRead();
|
| + }
|
| + if (signalsReceived.isWritable) {
|
| + assert(_eventSubscription.readyWrite);
|
| + handleWrite();
|
| + }
|
| + _isPeerClosed = signalsReceived.isPeerClosed ||
|
| + !_eventSubscription.enableSignals(signalsWatched);
|
| + _isInHandler = false;
|
| + if (_isPeerClosed) {
|
| + close().then((_) {
|
| + if (onError != null) {
|
| + onError(null);
|
| + }
|
| + });
|
| + }
|
| + }
|
| +
|
| void handleRead() {}
|
| void handleWrite() {}
|
|
|
| @@ -250,6 +229,6 @@ class MojoEventStreamListener {
|
| bool get isBound => _endpoint != null;
|
| bool get isPeerClosed => _isPeerClosed;
|
|
|
| - String toString() => "MojoEventStreamListener("
|
| + String toString() => "MojoEventHandler("
|
| "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
|
| }
|
|
|