| Index: base/message_pump_libevent.cc
|
| diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc
|
| index a281980c36e33b8e72dd50ccea18eba41c2c00c2..f31c01b68ebd11151ab12e8171002ea2d77e4819 100644
|
| --- a/base/message_pump_libevent.cc
|
| +++ b/base/message_pump_libevent.cc
|
| @@ -15,17 +15,48 @@ namespace base {
|
|
|
| // Return 0 on success
|
| // Too small a function to bother putting in a library?
|
| -static int SetNonBlocking(int fd)
|
| -{
|
| - int flags = fcntl(fd, F_GETFL, 0);
|
| - if (-1 == flags)
|
| - flags = 0;
|
| - return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
| +static int SetNonBlocking(int fd) {
|
| + int flags = fcntl(fd, F_GETFL, 0);
|
| + if (flags == -1)
|
| + flags = 0;
|
| + return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
| +}
|
| +
|
| +MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
|
| + : is_persistent_(false),
|
| + event_(NULL) {
|
| +}
|
| +
|
| +MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
|
| + if (event_.get()) {
|
| + StopWatchingFileDescriptor();
|
| + }
|
| +}
|
| +
|
| +void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
|
| + bool is_persistent) {
|
| + DCHECK(e);
|
| + DCHECK(event_.get() == NULL);
|
| +
|
| + is_persistent_ = is_persistent;
|
| + event_.reset(e);
|
| +}
|
| +
|
| +event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
|
| + return event_.release();
|
| +}
|
| +
|
| +bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
|
| + if (event_.get() == NULL) {
|
| + return true;
|
| + }
|
| +
|
| + // event_del() is a no-op of the event isn't active.
|
| + return (event_del(event_.get()) == 0);
|
| }
|
|
|
| // Called if a byte is received on the wakeup pipe.
|
| void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
|
| -
|
| base::MessagePumpLibevent* that =
|
| static_cast<base::MessagePumpLibevent*>(context);
|
| DCHECK(that->wakeup_pipe_out_ == socket);
|
| @@ -61,7 +92,7 @@ bool MessagePumpLibevent::Init() {
|
|
|
| wakeup_event_ = new event;
|
| event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
|
| - OnWakeup, this);
|
| + OnWakeup, this);
|
| event_base_set(event_base_, wakeup_event_);
|
|
|
| if (event_add(wakeup_event_, 0))
|
| @@ -77,67 +108,72 @@ MessagePumpLibevent::~MessagePumpLibevent() {
|
| event_base_free(event_base_);
|
| }
|
|
|
| -void MessagePumpLibevent::WatchSocket(int socket, short interest_mask,
|
| - event* e, Watcher* watcher) {
|
| +bool MessagePumpLibevent::WatchFileDescriptor(int fd,
|
| + bool persistent,
|
| + Mode mode,
|
| + FileDescriptorWatcher *controller,
|
| + Watcher *delegate) {
|
| + DCHECK(fd > 0);
|
| + DCHECK(controller);
|
| + DCHECK(delegate);
|
| + DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
|
|
|
| - // Set current interest mask and message pump for this event
|
| - event_set(e, socket, interest_mask, OnReadinessNotification, watcher);
|
| + int event_mask = persistent ? EV_PERSIST : 0;
|
| + if ((mode & WATCH_READ) != 0) {
|
| + event_mask |= EV_READ;
|
| + }
|
| + if ((mode & WATCH_WRITE) != 0) {
|
| + event_mask |= EV_WRITE;
|
| + }
|
|
|
| - // Tell libevent which message pump this socket will belong to when we add it.
|
| - event_base_set(event_base_, e);
|
| + // |should_delete_event| is true if we're modifying an event that's currently
|
| + // active in |controller|.
|
| + // If we're modifying an existing event and there's an error then we need to
|
| + // tell libevent to clean it up via event_delete() before returning.
|
| + bool should_delete_event = true;
|
| + scoped_ptr<event> evt(controller->ReleaseEvent());
|
| + if (evt.get() == NULL) {
|
| + should_delete_event = false;
|
| + // Ownership is transferred to the controller.
|
| + evt.reset(new event);
|
| + }
|
|
|
| - // Add this socket to the list of monitored sockets.
|
| - if (event_add(e, NULL))
|
| - NOTREACHED();
|
| -}
|
| + // Set current interest mask and message pump for this event.
|
| + event_set(evt.get(), fd, event_mask, OnLibeventNotification,
|
| + delegate);
|
|
|
| -void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask,
|
| - event* e, FileWatcher* watcher) {
|
| - // Set current interest mask and message pump for this event
|
| - if ((interest_mask & EV_READ) != 0) {
|
| - event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher);
|
| - } else {
|
| - event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher);
|
| + // Tell libevent which message pump this socket will belong to when we add it.
|
| + if (event_base_set(event_base_, evt.get()) != 0) {
|
| + if (should_delete_event) {
|
| + event_del(evt.get());
|
| + }
|
| + return false;
|
| }
|
|
|
| - // Tell libevent which message pump this fd will belong to when we add it.
|
| - event_base_set(event_base_, e);
|
| -
|
| - // Add this fd to the list of monitored sockets.
|
| - if (event_add(e, NULL))
|
| - NOTREACHED();
|
| -}
|
| + // Add this socket to the list of monitored sockets.
|
| + if (event_add(evt.get(), NULL) != 0) {
|
| + if (should_delete_event) {
|
| + event_del(evt.get());
|
| + }
|
| + return false;
|
| + }
|
|
|
| -void MessagePumpLibevent::UnwatchSocket(event* e) {
|
| - // Remove this socket from the list of monitored sockets.
|
| - if (event_del(e))
|
| - NOTREACHED();
|
| + // Transfer ownership of e to controller.
|
| + controller->Init(evt.release(), persistent);
|
| + return true;
|
| }
|
|
|
| -void MessagePumpLibevent::UnwatchFileHandle(event* e) {
|
| - // Remove this fd from the list of monitored fds.
|
| - if (event_del(e))
|
| - NOTREACHED();
|
| -}
|
|
|
| -void MessagePumpLibevent::OnReadinessNotification(int socket, short flags,
|
| - void* context) {
|
| - // The given socket is ready for I/O.
|
| - // Tell the owner what kind of I/O the socket is ready for.
|
| +void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
|
| + void* context) {
|
| Watcher* watcher = static_cast<Watcher*>(context);
|
| - watcher->OnSocketReady(flags);
|
| -}
|
|
|
| -void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags,
|
| - void* context) {
|
| - FileWatcher* watcher = static_cast<FileWatcher*>(context);
|
| - watcher->OnFileReadReady(fd);
|
| -}
|
| -
|
| -void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags,
|
| - void* context) {
|
| - FileWatcher* watcher = static_cast<FileWatcher*>(context);
|
| - watcher->OnFileWriteReady(fd);
|
| + if (flags & EV_WRITE) {
|
| + watcher->OnFileCanWriteWithoutBlocking(fd);
|
| + }
|
| + if (flags & EV_READ) {
|
| + watcher->OnFileCanReadWithoutBlocking(fd);
|
| + }
|
| }
|
|
|
| // Reentrant!
|
|
|