| OLD | NEW |
| 1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/message_pump_libevent.h" | 5 #include "base/message_pump_libevent.h" |
| 6 | 6 |
| 7 #include <fcntl.h> | 7 #include <fcntl.h> |
| 8 | 8 |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/scoped_nsautorelease_pool.h" | 10 #include "base/scoped_nsautorelease_pool.h" |
| 11 #include "base/time.h" | 11 #include "base/time.h" |
| 12 #include "third_party/libevent/event.h" | 12 #include "third_party/libevent/event.h" |
| 13 | 13 |
| 14 namespace base { | 14 namespace base { |
| 15 | 15 |
| 16 // Return 0 on success | 16 // Return 0 on success |
| 17 // Too small a function to bother putting in a library? | 17 // Too small a function to bother putting in a library? |
| 18 static int SetNonBlocking(int fd) | 18 static int SetNonBlocking(int fd) { |
| 19 { | 19 int flags = fcntl(fd, F_GETFL, 0); |
| 20 int flags = fcntl(fd, F_GETFL, 0); | 20 if (flags == -1) |
| 21 if (-1 == flags) | 21 flags = 0; |
| 22 flags = 0; | 22 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
| 23 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); | 23 } |
| 24 |
| 25 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() |
| 26 : is_persistent_(false), |
| 27 event_(NULL) { |
| 28 } |
| 29 |
| 30 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { |
| 31 if (event_.get()) { |
| 32 StopWatchingFileDescriptor(); |
| 33 } |
| 34 } |
| 35 |
| 36 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, |
| 37 bool is_persistent) { |
| 38 DCHECK(e); |
| 39 DCHECK(event_.get() == NULL); |
| 40 |
| 41 is_persistent_ = is_persistent; |
| 42 event_.reset(e); |
| 43 } |
| 44 |
| 45 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { |
| 46 return event_.release(); |
| 47 } |
| 48 |
| 49 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { |
| 50 if (event_.get() == NULL) { |
| 51 return true; |
| 52 } |
| 53 |
| 54 // event_del() is a no-op of the event isn't active. |
| 55 return (event_del(event_.get()) == 0); |
| 24 } | 56 } |
| 25 | 57 |
| 26 // Called if a byte is received on the wakeup pipe. | 58 // Called if a byte is received on the wakeup pipe. |
| 27 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { | 59 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { |
| 28 | |
| 29 base::MessagePumpLibevent* that = | 60 base::MessagePumpLibevent* that = |
| 30 static_cast<base::MessagePumpLibevent*>(context); | 61 static_cast<base::MessagePumpLibevent*>(context); |
| 31 DCHECK(that->wakeup_pipe_out_ == socket); | 62 DCHECK(that->wakeup_pipe_out_ == socket); |
| 32 | 63 |
| 33 // Remove and discard the wakeup byte. | 64 // Remove and discard the wakeup byte. |
| 34 char buf; | 65 char buf; |
| 35 int nread = read(socket, &buf, 1); | 66 int nread = read(socket, &buf, 1); |
| 36 DCHECK(nread == 1); | 67 DCHECK(nread == 1); |
| 37 // Tell libevent to break out of inner loop. | 68 // Tell libevent to break out of inner loop. |
| 38 event_base_loopbreak(that->event_base_); | 69 event_base_loopbreak(that->event_base_); |
| (...skipping 15 matching lines...) Expand all Loading... |
| 54 return false; | 85 return false; |
| 55 if (SetNonBlocking(fds[0])) | 86 if (SetNonBlocking(fds[0])) |
| 56 return false; | 87 return false; |
| 57 if (SetNonBlocking(fds[1])) | 88 if (SetNonBlocking(fds[1])) |
| 58 return false; | 89 return false; |
| 59 wakeup_pipe_out_ = fds[0]; | 90 wakeup_pipe_out_ = fds[0]; |
| 60 wakeup_pipe_in_ = fds[1]; | 91 wakeup_pipe_in_ = fds[1]; |
| 61 | 92 |
| 62 wakeup_event_ = new event; | 93 wakeup_event_ = new event; |
| 63 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, | 94 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, |
| 64 » OnWakeup, this); | 95 OnWakeup, this); |
| 65 event_base_set(event_base_, wakeup_event_); | 96 event_base_set(event_base_, wakeup_event_); |
| 66 | 97 |
| 67 if (event_add(wakeup_event_, 0)) | 98 if (event_add(wakeup_event_, 0)) |
| 68 return false; | 99 return false; |
| 69 return true; | 100 return true; |
| 70 } | 101 } |
| 71 | 102 |
| 72 MessagePumpLibevent::~MessagePumpLibevent() { | 103 MessagePumpLibevent::~MessagePumpLibevent() { |
| 73 DCHECK(wakeup_event_); | 104 DCHECK(wakeup_event_); |
| 74 DCHECK(event_base_); | 105 DCHECK(event_base_); |
| 75 event_del(wakeup_event_); | 106 event_del(wakeup_event_); |
| 76 delete wakeup_event_; | 107 delete wakeup_event_; |
| 77 event_base_free(event_base_); | 108 event_base_free(event_base_); |
| 78 } | 109 } |
| 79 | 110 |
| 80 void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, | 111 bool MessagePumpLibevent::WatchFileDescriptor(int fd, |
| 81 event* e, Watcher* watcher) { | 112 bool persistent, |
| 113 Mode mode, |
| 114 FileDescriptorWatcher *controller, |
| 115 Watcher *delegate) { |
| 116 DCHECK(fd > 0); |
| 117 DCHECK(controller); |
| 118 DCHECK(delegate); |
| 119 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); |
| 82 | 120 |
| 83 // Set current interest mask and message pump for this event | 121 int event_mask = persistent ? EV_PERSIST : 0; |
| 84 event_set(e, socket, interest_mask, OnReadinessNotification, watcher); | 122 if ((mode & WATCH_READ) != 0) { |
| 123 event_mask |= EV_READ; |
| 124 } |
| 125 if ((mode & WATCH_WRITE) != 0) { |
| 126 event_mask |= EV_WRITE; |
| 127 } |
| 128 |
| 129 // |should_delete_event| is true if we're modifying an event that's currently |
| 130 // active in |controller|. |
| 131 // If we're modifying an existing event and there's an error then we need to |
| 132 // tell libevent to clean it up via event_delete() before returning. |
| 133 bool should_delete_event = true; |
| 134 scoped_ptr<event> evt(controller->ReleaseEvent()); |
| 135 if (evt.get() == NULL) { |
| 136 should_delete_event = false; |
| 137 // Ownership is transferred to the controller. |
| 138 evt.reset(new event); |
| 139 } |
| 140 |
| 141 // Set current interest mask and message pump for this event. |
| 142 event_set(evt.get(), fd, event_mask, OnLibeventNotification, |
| 143 delegate); |
| 85 | 144 |
| 86 // Tell libevent which message pump this socket will belong to when we add it. | 145 // Tell libevent which message pump this socket will belong to when we add it. |
| 87 event_base_set(event_base_, e); | 146 if (event_base_set(event_base_, evt.get()) != 0) { |
| 147 if (should_delete_event) { |
| 148 event_del(evt.get()); |
| 149 } |
| 150 return false; |
| 151 } |
| 88 | 152 |
| 89 // Add this socket to the list of monitored sockets. | 153 // Add this socket to the list of monitored sockets. |
| 90 if (event_add(e, NULL)) | 154 if (event_add(evt.get(), NULL) != 0) { |
| 91 NOTREACHED(); | 155 if (should_delete_event) { |
| 156 event_del(evt.get()); |
| 157 } |
| 158 return false; |
| 159 } |
| 160 |
| 161 // Transfer ownership of e to controller. |
| 162 controller->Init(evt.release(), persistent); |
| 163 return true; |
| 92 } | 164 } |
| 93 | 165 |
| 94 void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask, | 166 |
| 95 event* e, FileWatcher* watcher) { | 167 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, |
| 96 // Set current interest mask and message pump for this event | 168 void* context) { |
| 97 if ((interest_mask & EV_READ) != 0) { | 169 Watcher* watcher = static_cast<Watcher*>(context); |
| 98 event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher); | 170 |
| 99 } else { | 171 if (flags & EV_WRITE) { |
| 100 event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher); | 172 watcher->OnFileCanWriteWithoutBlocking(fd); |
| 101 } | 173 } |
| 102 | 174 if (flags & EV_READ) { |
| 103 // Tell libevent which message pump this fd will belong to when we add it. | 175 watcher->OnFileCanReadWithoutBlocking(fd); |
| 104 event_base_set(event_base_, e); | 176 } |
| 105 | |
| 106 // Add this fd to the list of monitored sockets. | |
| 107 if (event_add(e, NULL)) | |
| 108 NOTREACHED(); | |
| 109 } | |
| 110 | |
| 111 void MessagePumpLibevent::UnwatchSocket(event* e) { | |
| 112 // Remove this socket from the list of monitored sockets. | |
| 113 if (event_del(e)) | |
| 114 NOTREACHED(); | |
| 115 } | |
| 116 | |
| 117 void MessagePumpLibevent::UnwatchFileHandle(event* e) { | |
| 118 // Remove this fd from the list of monitored fds. | |
| 119 if (event_del(e)) | |
| 120 NOTREACHED(); | |
| 121 } | |
| 122 | |
| 123 void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, | |
| 124 void* context) { | |
| 125 // The given socket is ready for I/O. | |
| 126 // Tell the owner what kind of I/O the socket is ready for. | |
| 127 Watcher* watcher = static_cast<Watcher*>(context); | |
| 128 watcher->OnSocketReady(flags); | |
| 129 } | |
| 130 | |
| 131 void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags, | |
| 132 void* context) { | |
| 133 FileWatcher* watcher = static_cast<FileWatcher*>(context); | |
| 134 watcher->OnFileReadReady(fd); | |
| 135 } | |
| 136 | |
| 137 void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags, | |
| 138 void* context) { | |
| 139 FileWatcher* watcher = static_cast<FileWatcher*>(context); | |
| 140 watcher->OnFileWriteReady(fd); | |
| 141 } | 177 } |
| 142 | 178 |
| 143 // Reentrant! | 179 // Reentrant! |
| 144 void MessagePumpLibevent::Run(Delegate* delegate) { | 180 void MessagePumpLibevent::Run(Delegate* delegate) { |
| 145 DCHECK(keep_running_) << "Quit must have been called outside of Run!"; | 181 DCHECK(keep_running_) << "Quit must have been called outside of Run!"; |
| 146 | 182 |
| 147 bool old_in_run = in_run_; | 183 bool old_in_run = in_run_; |
| 148 in_run_ = true; | 184 in_run_ = true; |
| 149 | 185 |
| 150 for (;;) { | 186 for (;;) { |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 207 } | 243 } |
| 208 | 244 |
| 209 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { | 245 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { |
| 210 // We know that we can't be blocked on Wait right now since this method can | 246 // We know that we can't be blocked on Wait right now since this method can |
| 211 // only be called on the same thread as Run, so we only need to update our | 247 // only be called on the same thread as Run, so we only need to update our |
| 212 // record of how long to sleep when we do sleep. | 248 // record of how long to sleep when we do sleep. |
| 213 delayed_work_time_ = delayed_work_time; | 249 delayed_work_time_ = delayed_work_time; |
| 214 } | 250 } |
| 215 | 251 |
| 216 } // namespace base | 252 } // namespace base |
| OLD | NEW |