Index: base/message_pump_libevent.cc |
diff --git a/base/message_pump_libevent.cc b/base/message_pump_libevent.cc |
index 2ad1d97a4f58e87d8f31b266eb6bbae03ed514fe..c2390b46c8b7cbed638a561136ba37032bf813e0 100644 |
--- a/base/message_pump_libevent.cc |
+++ b/base/message_pump_libevent.cc |
@@ -7,9 +7,10 @@ |
#include <errno.h> |
#include <fcntl.h> |
-#include "eintr_wrapper.h" |
#include "base/auto_reset.h" |
+#include "base/eintr_wrapper.h" |
#include "base/logging.h" |
+#include "base/observer_list.h" |
#include "base/scoped_nsautorelease_pool.h" |
#include "base/scoped_ptr.h" |
#include "base/time.h" |
@@ -50,7 +51,9 @@ static int SetNonBlocking(int fd) { |
MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() |
: is_persistent_(false), |
- event_(NULL) { |
+ event_(NULL), |
+ pump_(NULL), |
+ watcher_(NULL) { |
} |
MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { |
@@ -82,9 +85,25 @@ bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { |
// event_del() is a no-op if the event isn't active. |
int rv = event_del(e); |
delete e; |
+ pump_ = NULL; |
+ watcher_ = NULL; |
return (rv == 0); |
} |
+void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( |
+ int fd, MessagePumpLibevent* pump) { |
+ pump->WillProcessIOEvent(); |
+ watcher_->OnFileCanReadWithoutBlocking(fd); |
+ pump->DidProcessIOEvent(); |
+} |
+ |
+void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( |
+ int fd, MessagePumpLibevent* pump) { |
+ pump->WillProcessIOEvent(); |
+ watcher_->OnFileCanWriteWithoutBlocking(fd); |
+ pump->DidProcessIOEvent(); |
+} |
+ |
// Called if a byte is received on the wakeup pipe. |
void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { |
base::MessagePumpLibevent* that = |
@@ -142,9 +161,9 @@ MessagePumpLibevent::~MessagePumpLibevent() { |
event_del(wakeup_event_); |
delete wakeup_event_; |
if (wakeup_pipe_in_ >= 0) |
- close(wakeup_pipe_in_); |
+ HANDLE_EINTR(close(wakeup_pipe_in_)); |
if (wakeup_pipe_out_ >= 0) |
- close(wakeup_pipe_out_); |
+ HANDLE_EINTR(close(wakeup_pipe_out_)); |
event_base_free(event_base_); |
} |
@@ -190,7 +209,7 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd, |
} |
// Set current interest mask and message pump for this event. |
- event_set(evt.get(), fd, event_mask, OnLibeventNotification, delegate); |
+ event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller); |
// Tell libevent which message pump this socket will belong to when we add it. |
if (event_base_set(event_base_, evt.get()) != 0) { |
@@ -204,19 +223,25 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd, |
// Transfer ownership of evt to controller. |
controller->Init(evt.release(), persistent); |
+ |
+ controller->set_watcher(delegate); |
+ controller->set_pump(this); |
+ |
return true; |
} |
- |
void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, |
void* context) { |
- Watcher* watcher = static_cast<Watcher*>(context); |
+ FileDescriptorWatcher* controller = |
+ static_cast<FileDescriptorWatcher*>(context); |
+ |
+ MessagePumpLibevent* pump = controller->pump(); |
if (flags & EV_WRITE) { |
- watcher->OnFileCanWriteWithoutBlocking(fd); |
+ controller->OnFileCanWriteWithoutBlocking(fd, pump); |
} |
if (flags & EV_READ) { |
- watcher->OnFileCanReadWithoutBlocking(fd); |
+ controller->OnFileCanReadWithoutBlocking(fd, pump); |
} |
} |
@@ -304,4 +329,20 @@ void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { |
delayed_work_time_ = delayed_work_time; |
} |
+void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { |
+ io_observers_.AddObserver(obs); |
+} |
+ |
+void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { |
+ io_observers_.RemoveObserver(obs); |
+} |
+ |
+void MessagePumpLibevent::WillProcessIOEvent() { |
+ FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); |
+} |
+ |
+void MessagePumpLibevent::DidProcessIOEvent() { |
+ FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); |
+} |
+ |
} // namespace base |