| OLD | NEW |
| 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/message_pump_libevent.h" | 5 #include "base/message_pump_libevent.h" |
| 6 | 6 |
| 7 #include <errno.h> | 7 #include <errno.h> |
| 8 #include <fcntl.h> | 8 #include <fcntl.h> |
| 9 | 9 |
| 10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 55 pump_(NULL), | 55 pump_(NULL), |
| 56 watcher_(NULL) { | 56 watcher_(NULL) { |
| 57 } | 57 } |
| 58 | 58 |
| 59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { | 59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { |
| 60 if (event_) { | 60 if (event_) { |
| 61 StopWatchingFileDescriptor(); | 61 StopWatchingFileDescriptor(); |
| 62 } | 62 } |
| 63 } | 63 } |
| 64 | 64 |
| 65 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { |
| 66 event* e = ReleaseEvent(); |
| 67 if (e == NULL) |
| 68 return true; |
| 69 |
| 70 // event_del() is a no-op if the event isn't active. |
| 71 int rv = event_del(e); |
| 72 delete e; |
| 73 pump_ = NULL; |
| 74 watcher_ = NULL; |
| 75 return (rv == 0); |
| 76 } |
| 77 |
| 65 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, | 78 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, |
| 66 bool is_persistent) { | 79 bool is_persistent) { |
| 67 DCHECK(e); | 80 DCHECK(e); |
| 68 DCHECK(event_ == NULL); | 81 DCHECK(event_ == NULL); |
| 69 | 82 |
| 70 is_persistent_ = is_persistent; | 83 is_persistent_ = is_persistent; |
| 71 event_ = e; | 84 event_ = e; |
| 72 } | 85 } |
| 73 | 86 |
| 74 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { | 87 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { |
| 75 struct event *e = event_; | 88 struct event *e = event_; |
| 76 event_ = NULL; | 89 event_ = NULL; |
| 77 return e; | 90 return e; |
| 78 } | 91 } |
| 79 | 92 |
| 80 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { | |
| 81 event* e = ReleaseEvent(); | |
| 82 if (e == NULL) | |
| 83 return true; | |
| 84 | |
| 85 // event_del() is a no-op if the event isn't active. | |
| 86 int rv = event_del(e); | |
| 87 delete e; | |
| 88 pump_ = NULL; | |
| 89 watcher_ = NULL; | |
| 90 return (rv == 0); | |
| 91 } | |
| 92 | |
| 93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( | 93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( |
| 94 int fd, MessagePumpLibevent* pump) { | 94 int fd, MessagePumpLibevent* pump) { |
| 95 pump->WillProcessIOEvent(); | 95 pump->WillProcessIOEvent(); |
| 96 watcher_->OnFileCanReadWithoutBlocking(fd); | 96 watcher_->OnFileCanReadWithoutBlocking(fd); |
| 97 pump->DidProcessIOEvent(); | 97 pump->DidProcessIOEvent(); |
| 98 } | 98 } |
| 99 | 99 |
| 100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( | 100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( |
| 101 int fd, MessagePumpLibevent* pump) { | 101 int fd, MessagePumpLibevent* pump) { |
| 102 pump->WillProcessIOEvent(); | 102 pump->WillProcessIOEvent(); |
| 103 watcher_->OnFileCanWriteWithoutBlocking(fd); | 103 watcher_->OnFileCanWriteWithoutBlocking(fd); |
| 104 pump->DidProcessIOEvent(); | 104 pump->DidProcessIOEvent(); |
| 105 } | 105 } |
| 106 | 106 |
| 107 // Called if a byte is received on the wakeup pipe. | |
| 108 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { | |
| 109 base::MessagePumpLibevent* that = | |
| 110 static_cast<base::MessagePumpLibevent*>(context); | |
| 111 DCHECK(that->wakeup_pipe_out_ == socket); | |
| 112 | |
| 113 // Remove and discard the wakeup byte. | |
| 114 char buf; | |
| 115 int nread = HANDLE_EINTR(read(socket, &buf, 1)); | |
| 116 DCHECK_EQ(nread, 1); | |
| 117 // Tell libevent to break out of inner loop. | |
| 118 event_base_loopbreak(that->event_base_); | |
| 119 } | |
| 120 | |
| 121 MessagePumpLibevent::MessagePumpLibevent() | 107 MessagePumpLibevent::MessagePumpLibevent() |
| 122 : keep_running_(true), | 108 : keep_running_(true), |
| 123 in_run_(false), | 109 in_run_(false), |
| 124 event_base_(event_base_new()), | 110 event_base_(event_base_new()), |
| 125 wakeup_pipe_in_(-1), | 111 wakeup_pipe_in_(-1), |
| 126 wakeup_pipe_out_(-1) { | 112 wakeup_pipe_out_(-1) { |
| 127 if (!Init()) | 113 if (!Init()) |
| 128 NOTREACHED(); | 114 NOTREACHED(); |
| 129 } | 115 } |
| 130 | 116 |
| 131 bool MessagePumpLibevent::Init() { | |
| 132 int fds[2]; | |
| 133 if (pipe(fds)) { | |
| 134 DLOG(ERROR) << "pipe() failed, errno: " << errno; | |
| 135 return false; | |
| 136 } | |
| 137 if (SetNonBlocking(fds[0])) { | |
| 138 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; | |
| 139 return false; | |
| 140 } | |
| 141 if (SetNonBlocking(fds[1])) { | |
| 142 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; | |
| 143 return false; | |
| 144 } | |
| 145 wakeup_pipe_out_ = fds[0]; | |
| 146 wakeup_pipe_in_ = fds[1]; | |
| 147 | |
| 148 wakeup_event_ = new event; | |
| 149 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, | |
| 150 OnWakeup, this); | |
| 151 event_base_set(event_base_, wakeup_event_); | |
| 152 | |
| 153 if (event_add(wakeup_event_, 0)) | |
| 154 return false; | |
| 155 return true; | |
| 156 } | |
| 157 | |
| 158 MessagePumpLibevent::~MessagePumpLibevent() { | 117 MessagePumpLibevent::~MessagePumpLibevent() { |
| 159 DCHECK(wakeup_event_); | 118 DCHECK(wakeup_event_); |
| 160 DCHECK(event_base_); | 119 DCHECK(event_base_); |
| 161 event_del(wakeup_event_); | 120 event_del(wakeup_event_); |
| 162 delete wakeup_event_; | 121 delete wakeup_event_; |
| 163 if (wakeup_pipe_in_ >= 0) { | 122 if (wakeup_pipe_in_ >= 0) { |
| 164 if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0) | 123 if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0) |
| 165 PLOG(ERROR) << "close"; | 124 PLOG(ERROR) << "close"; |
| 166 } | 125 } |
| 167 if (wakeup_pipe_out_ >= 0) { | 126 if (wakeup_pipe_out_ >= 0) { |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 227 | 186 |
| 228 // Transfer ownership of evt to controller. | 187 // Transfer ownership of evt to controller. |
| 229 controller->Init(evt.release(), persistent); | 188 controller->Init(evt.release(), persistent); |
| 230 | 189 |
| 231 controller->set_watcher(delegate); | 190 controller->set_watcher(delegate); |
| 232 controller->set_pump(this); | 191 controller->set_pump(this); |
| 233 | 192 |
| 234 return true; | 193 return true; |
| 235 } | 194 } |
| 236 | 195 |
| 237 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, | 196 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { |
| 238 void* context) { | 197 io_observers_.AddObserver(obs); |
| 239 FileDescriptorWatcher* controller = | 198 } |
| 240 static_cast<FileDescriptorWatcher*>(context); | |
| 241 | 199 |
| 242 MessagePumpLibevent* pump = controller->pump(); | 200 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { |
| 243 | 201 io_observers_.RemoveObserver(obs); |
| 244 if (flags & EV_WRITE) { | |
| 245 controller->OnFileCanWriteWithoutBlocking(fd, pump); | |
| 246 } | |
| 247 if (flags & EV_READ) { | |
| 248 controller->OnFileCanReadWithoutBlocking(fd, pump); | |
| 249 } | |
| 250 } | 202 } |
| 251 | 203 |
| 252 // Tell libevent to break out of inner loop. | 204 // Tell libevent to break out of inner loop. |
| 253 static void timer_callback(int fd, short events, void *context) | 205 static void timer_callback(int fd, short events, void *context) |
| 254 { | 206 { |
| 255 event_base_loopbreak((struct event_base *)context); | 207 event_base_loopbreak((struct event_base *)context); |
| 256 } | 208 } |
| 257 | 209 |
| 258 // Reentrant! | 210 // Reentrant! |
| 259 void MessagePumpLibevent::Run(Delegate* delegate) { | 211 void MessagePumpLibevent::Run(Delegate* delegate) { |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 327 } | 279 } |
| 328 | 280 |
| 329 void MessagePumpLibevent::ScheduleDelayedWork( | 281 void MessagePumpLibevent::ScheduleDelayedWork( |
| 330 const TimeTicks& delayed_work_time) { | 282 const TimeTicks& delayed_work_time) { |
| 331 // We know that we can't be blocked on Wait right now since this method can | 283 // We know that we can't be blocked on Wait right now since this method can |
| 332 // only be called on the same thread as Run, so we only need to update our | 284 // only be called on the same thread as Run, so we only need to update our |
| 333 // record of how long to sleep when we do sleep. | 285 // record of how long to sleep when we do sleep. |
| 334 delayed_work_time_ = delayed_work_time; | 286 delayed_work_time_ = delayed_work_time; |
| 335 } | 287 } |
| 336 | 288 |
| 337 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { | |
| 338 io_observers_.AddObserver(obs); | |
| 339 } | |
| 340 | |
| 341 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { | |
| 342 io_observers_.RemoveObserver(obs); | |
| 343 } | |
| 344 | |
| 345 void MessagePumpLibevent::WillProcessIOEvent() { | 289 void MessagePumpLibevent::WillProcessIOEvent() { |
| 346 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); | 290 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); |
| 347 } | 291 } |
| 348 | 292 |
| 349 void MessagePumpLibevent::DidProcessIOEvent() { | 293 void MessagePumpLibevent::DidProcessIOEvent() { |
| 350 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); | 294 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); |
| 351 } | 295 } |
| 352 | 296 |
| 297 bool MessagePumpLibevent::Init() { |
| 298 int fds[2]; |
| 299 if (pipe(fds)) { |
| 300 DLOG(ERROR) << "pipe() failed, errno: " << errno; |
| 301 return false; |
| 302 } |
| 303 if (SetNonBlocking(fds[0])) { |
| 304 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; |
| 305 return false; |
| 306 } |
| 307 if (SetNonBlocking(fds[1])) { |
| 308 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; |
| 309 return false; |
| 310 } |
| 311 wakeup_pipe_out_ = fds[0]; |
| 312 wakeup_pipe_in_ = fds[1]; |
| 313 |
| 314 wakeup_event_ = new event; |
| 315 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, |
| 316 OnWakeup, this); |
| 317 event_base_set(event_base_, wakeup_event_); |
| 318 |
| 319 if (event_add(wakeup_event_, 0)) |
| 320 return false; |
| 321 return true; |
| 322 } |
| 323 |
| 324 // static |
| 325 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, |
| 326 void* context) { |
| 327 FileDescriptorWatcher* controller = |
| 328 static_cast<FileDescriptorWatcher*>(context); |
| 329 |
| 330 MessagePumpLibevent* pump = controller->pump(); |
| 331 |
| 332 if (flags & EV_WRITE) { |
| 333 controller->OnFileCanWriteWithoutBlocking(fd, pump); |
| 334 } |
| 335 if (flags & EV_READ) { |
| 336 controller->OnFileCanReadWithoutBlocking(fd, pump); |
| 337 } |
| 338 } |
| 339 |
| 340 // Called if a byte is received on the wakeup pipe. |
| 341 // static |
| 342 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { |
| 343 base::MessagePumpLibevent* that = |
| 344 static_cast<base::MessagePumpLibevent*>(context); |
| 345 DCHECK(that->wakeup_pipe_out_ == socket); |
| 346 |
| 347 // Remove and discard the wakeup byte. |
| 348 char buf; |
| 349 int nread = HANDLE_EINTR(read(socket, &buf, 1)); |
| 350 DCHECK_EQ(nread, 1); |
| 351 // Tell libevent to break out of inner loop. |
| 352 event_base_loopbreak(that->event_base_); |
| 353 } |
| 354 |
| 353 } // namespace base | 355 } // namespace base |
| OLD | NEW |