OLD | NEW |
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/message_pump_libevent.h" | 5 #include "base/message_pump_libevent.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <fcntl.h> | 8 #include <fcntl.h> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
55 pump_(NULL), | 55 pump_(NULL), |
56 watcher_(NULL) { | 56 watcher_(NULL) { |
57 } | 57 } |
58 | 58 |
59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { | 59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { |
60 if (event_) { | 60 if (event_) { |
61 StopWatchingFileDescriptor(); | 61 StopWatchingFileDescriptor(); |
62 } | 62 } |
63 } | 63 } |
64 | 64 |
| 65 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { |
| 66 event* e = ReleaseEvent(); |
| 67 if (e == NULL) |
| 68 return true; |
| 69 |
| 70 // event_del() is a no-op if the event isn't active. |
| 71 int rv = event_del(e); |
| 72 delete e; |
| 73 pump_ = NULL; |
| 74 watcher_ = NULL; |
| 75 return (rv == 0); |
| 76 } |
| 77 |
65 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, | 78 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, |
66 bool is_persistent) { | 79 bool is_persistent) { |
67 DCHECK(e); | 80 DCHECK(e); |
68 DCHECK(event_ == NULL); | 81 DCHECK(event_ == NULL); |
69 | 82 |
70 is_persistent_ = is_persistent; | 83 is_persistent_ = is_persistent; |
71 event_ = e; | 84 event_ = e; |
72 } | 85 } |
73 | 86 |
74 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { | 87 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { |
75 struct event *e = event_; | 88 struct event *e = event_; |
76 event_ = NULL; | 89 event_ = NULL; |
77 return e; | 90 return e; |
78 } | 91 } |
79 | 92 |
80 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { | |
81 event* e = ReleaseEvent(); | |
82 if (e == NULL) | |
83 return true; | |
84 | |
85 // event_del() is a no-op if the event isn't active. | |
86 int rv = event_del(e); | |
87 delete e; | |
88 pump_ = NULL; | |
89 watcher_ = NULL; | |
90 return (rv == 0); | |
91 } | |
92 | |
93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( | 93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( |
94 int fd, MessagePumpLibevent* pump) { | 94 int fd, MessagePumpLibevent* pump) { |
95 pump->WillProcessIOEvent(); | 95 pump->WillProcessIOEvent(); |
96 watcher_->OnFileCanReadWithoutBlocking(fd); | 96 watcher_->OnFileCanReadWithoutBlocking(fd); |
97 pump->DidProcessIOEvent(); | 97 pump->DidProcessIOEvent(); |
98 } | 98 } |
99 | 99 |
100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( | 100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( |
101 int fd, MessagePumpLibevent* pump) { | 101 int fd, MessagePumpLibevent* pump) { |
102 pump->WillProcessIOEvent(); | 102 pump->WillProcessIOEvent(); |
103 watcher_->OnFileCanWriteWithoutBlocking(fd); | 103 watcher_->OnFileCanWriteWithoutBlocking(fd); |
104 pump->DidProcessIOEvent(); | 104 pump->DidProcessIOEvent(); |
105 } | 105 } |
106 | 106 |
107 // Called if a byte is received on the wakeup pipe. | |
108 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { | |
109 base::MessagePumpLibevent* that = | |
110 static_cast<base::MessagePumpLibevent*>(context); | |
111 DCHECK(that->wakeup_pipe_out_ == socket); | |
112 | |
113 // Remove and discard the wakeup byte. | |
114 char buf; | |
115 int nread = HANDLE_EINTR(read(socket, &buf, 1)); | |
116 DCHECK_EQ(nread, 1); | |
117 // Tell libevent to break out of inner loop. | |
118 event_base_loopbreak(that->event_base_); | |
119 } | |
120 | |
121 MessagePumpLibevent::MessagePumpLibevent() | 107 MessagePumpLibevent::MessagePumpLibevent() |
122 : keep_running_(true), | 108 : keep_running_(true), |
123 in_run_(false), | 109 in_run_(false), |
124 event_base_(event_base_new()), | 110 event_base_(event_base_new()), |
125 wakeup_pipe_in_(-1), | 111 wakeup_pipe_in_(-1), |
126 wakeup_pipe_out_(-1) { | 112 wakeup_pipe_out_(-1) { |
127 if (!Init()) | 113 if (!Init()) |
128 NOTREACHED(); | 114 NOTREACHED(); |
129 } | 115 } |
130 | 116 |
131 bool MessagePumpLibevent::Init() { | |
132 int fds[2]; | |
133 if (pipe(fds)) { | |
134 DLOG(ERROR) << "pipe() failed, errno: " << errno; | |
135 return false; | |
136 } | |
137 if (SetNonBlocking(fds[0])) { | |
138 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; | |
139 return false; | |
140 } | |
141 if (SetNonBlocking(fds[1])) { | |
142 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; | |
143 return false; | |
144 } | |
145 wakeup_pipe_out_ = fds[0]; | |
146 wakeup_pipe_in_ = fds[1]; | |
147 | |
148 wakeup_event_ = new event; | |
149 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, | |
150 OnWakeup, this); | |
151 event_base_set(event_base_, wakeup_event_); | |
152 | |
153 if (event_add(wakeup_event_, 0)) | |
154 return false; | |
155 return true; | |
156 } | |
157 | |
158 MessagePumpLibevent::~MessagePumpLibevent() { | 117 MessagePumpLibevent::~MessagePumpLibevent() { |
159 DCHECK(wakeup_event_); | 118 DCHECK(wakeup_event_); |
160 DCHECK(event_base_); | 119 DCHECK(event_base_); |
161 event_del(wakeup_event_); | 120 event_del(wakeup_event_); |
162 delete wakeup_event_; | 121 delete wakeup_event_; |
163 if (wakeup_pipe_in_ >= 0) { | 122 if (wakeup_pipe_in_ >= 0) { |
164 if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0) | 123 if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0) |
165 PLOG(ERROR) << "close"; | 124 PLOG(ERROR) << "close"; |
166 } | 125 } |
167 if (wakeup_pipe_out_ >= 0) { | 126 if (wakeup_pipe_out_ >= 0) { |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
227 | 186 |
228 // Transfer ownership of evt to controller. | 187 // Transfer ownership of evt to controller. |
229 controller->Init(evt.release(), persistent); | 188 controller->Init(evt.release(), persistent); |
230 | 189 |
231 controller->set_watcher(delegate); | 190 controller->set_watcher(delegate); |
232 controller->set_pump(this); | 191 controller->set_pump(this); |
233 | 192 |
234 return true; | 193 return true; |
235 } | 194 } |
236 | 195 |
237 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, | 196 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { |
238 void* context) { | 197 io_observers_.AddObserver(obs); |
239 FileDescriptorWatcher* controller = | 198 } |
240 static_cast<FileDescriptorWatcher*>(context); | |
241 | 199 |
242 MessagePumpLibevent* pump = controller->pump(); | 200 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { |
243 | 201 io_observers_.RemoveObserver(obs); |
244 if (flags & EV_WRITE) { | |
245 controller->OnFileCanWriteWithoutBlocking(fd, pump); | |
246 } | |
247 if (flags & EV_READ) { | |
248 controller->OnFileCanReadWithoutBlocking(fd, pump); | |
249 } | |
250 } | 202 } |
251 | 203 |
252 // Tell libevent to break out of inner loop. | 204 // Tell libevent to break out of inner loop. |
253 static void timer_callback(int fd, short events, void *context) | 205 static void timer_callback(int fd, short events, void *context) |
254 { | 206 { |
255 event_base_loopbreak((struct event_base *)context); | 207 event_base_loopbreak((struct event_base *)context); |
256 } | 208 } |
257 | 209 |
258 // Reentrant! | 210 // Reentrant! |
259 void MessagePumpLibevent::Run(Delegate* delegate) { | 211 void MessagePumpLibevent::Run(Delegate* delegate) { |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
327 } | 279 } |
328 | 280 |
329 void MessagePumpLibevent::ScheduleDelayedWork( | 281 void MessagePumpLibevent::ScheduleDelayedWork( |
330 const TimeTicks& delayed_work_time) { | 282 const TimeTicks& delayed_work_time) { |
331 // We know that we can't be blocked on Wait right now since this method can | 283 // We know that we can't be blocked on Wait right now since this method can |
332 // only be called on the same thread as Run, so we only need to update our | 284 // only be called on the same thread as Run, so we only need to update our |
333 // record of how long to sleep when we do sleep. | 285 // record of how long to sleep when we do sleep. |
334 delayed_work_time_ = delayed_work_time; | 286 delayed_work_time_ = delayed_work_time; |
335 } | 287 } |
336 | 288 |
337 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { | |
338 io_observers_.AddObserver(obs); | |
339 } | |
340 | |
341 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { | |
342 io_observers_.RemoveObserver(obs); | |
343 } | |
344 | |
345 void MessagePumpLibevent::WillProcessIOEvent() { | 289 void MessagePumpLibevent::WillProcessIOEvent() { |
346 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); | 290 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); |
347 } | 291 } |
348 | 292 |
349 void MessagePumpLibevent::DidProcessIOEvent() { | 293 void MessagePumpLibevent::DidProcessIOEvent() { |
350 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); | 294 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); |
351 } | 295 } |
352 | 296 |
| 297 bool MessagePumpLibevent::Init() { |
| 298 int fds[2]; |
| 299 if (pipe(fds)) { |
| 300 DLOG(ERROR) << "pipe() failed, errno: " << errno; |
| 301 return false; |
| 302 } |
| 303 if (SetNonBlocking(fds[0])) { |
| 304 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; |
| 305 return false; |
| 306 } |
| 307 if (SetNonBlocking(fds[1])) { |
| 308 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; |
| 309 return false; |
| 310 } |
| 311 wakeup_pipe_out_ = fds[0]; |
| 312 wakeup_pipe_in_ = fds[1]; |
| 313 |
| 314 wakeup_event_ = new event; |
| 315 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, |
| 316 OnWakeup, this); |
| 317 event_base_set(event_base_, wakeup_event_); |
| 318 |
| 319 if (event_add(wakeup_event_, 0)) |
| 320 return false; |
| 321 return true; |
| 322 } |
| 323 |
| 324 // static |
| 325 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, |
| 326 void* context) { |
| 327 FileDescriptorWatcher* controller = |
| 328 static_cast<FileDescriptorWatcher*>(context); |
| 329 |
| 330 MessagePumpLibevent* pump = controller->pump(); |
| 331 |
| 332 if (flags & EV_WRITE) { |
| 333 controller->OnFileCanWriteWithoutBlocking(fd, pump); |
| 334 } |
| 335 if (flags & EV_READ) { |
| 336 controller->OnFileCanReadWithoutBlocking(fd, pump); |
| 337 } |
| 338 } |
| 339 |
| 340 // Called if a byte is received on the wakeup pipe. |
| 341 // static |
| 342 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { |
| 343 base::MessagePumpLibevent* that = |
| 344 static_cast<base::MessagePumpLibevent*>(context); |
| 345 DCHECK(that->wakeup_pipe_out_ == socket); |
| 346 |
| 347 // Remove and discard the wakeup byte. |
| 348 char buf; |
| 349 int nread = HANDLE_EINTR(read(socket, &buf, 1)); |
| 350 DCHECK_EQ(nread, 1); |
| 351 // Tell libevent to break out of inner loop. |
| 352 event_base_loopbreak(that->event_base_); |
| 353 } |
| 354 |
353 } // namespace base | 355 } // namespace base |
OLD | NEW |