OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "base/message_pump_libevent.h" | |
6 | |
7 #include <errno.h> | |
8 #include <fcntl.h> | |
9 #include <unistd.h> | |
10 | |
11 #include "base/auto_reset.h" | |
12 #include "base/compiler_specific.h" | |
13 #include "base/logging.h" | |
14 #if defined(OS_MACOSX) | |
15 #include "base/mac/scoped_nsautorelease_pool.h" | |
16 #endif | |
17 #include "base/memory/scoped_ptr.h" | |
18 #include "base/observer_list.h" | |
19 #include "base/posix/eintr_wrapper.h" | |
20 #include "base/time.h" | |
21 #include "third_party/libevent/event.h" | |
22 | |
23 #if defined(OS_MACOSX) | |
24 #include "base/mac/scoped_nsautorelease_pool.h" | |
25 #endif | |
26 | |
27 // Lifecycle of struct event | |
28 // Libevent uses two main data structures: | |
29 // struct event_base (of which there is one per message pump), and | |
30 // struct event (of which there is roughly one per socket). | |
31 // The socket's struct event is created in | |
32 // MessagePumpLibevent::WatchFileDescriptor(), | |
33 // is owned by the FileDescriptorWatcher, and is destroyed in | |
34 // StopWatchingFileDescriptor(). | |
35 // It is moved into and out of lists in struct event_base by | |
36 // the libevent functions event_add() and event_del(). | |
37 // | |
38 // TODO(dkegel): | |
39 // At the moment bad things happen if a FileDescriptorWatcher | |
40 // is active after its MessagePumpLibevent has been destroyed. | |
41 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop | |
42 // Not clear yet whether that situation occurs in practice, | |
43 // but if it does, we need to fix it. | |
44 | |
45 namespace base { | |
46 | |
47 // Return 0 on success | |
48 // Too small a function to bother putting in a library? | |
49 static int SetNonBlocking(int fd) { | |
50 int flags = fcntl(fd, F_GETFL, 0); | |
51 if (flags == -1) | |
52 flags = 0; | |
53 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); | |
54 } | |
55 | |
56 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() | |
57 : event_(NULL), | |
58 pump_(NULL), | |
59 watcher_(NULL), | |
60 weak_factory_(this) { | |
61 } | |
62 | |
63 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { | |
64 if (event_) { | |
65 StopWatchingFileDescriptor(); | |
66 } | |
67 } | |
68 | |
69 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { | |
70 event* e = ReleaseEvent(); | |
71 if (e == NULL) | |
72 return true; | |
73 | |
74 // event_del() is a no-op if the event isn't active. | |
75 int rv = event_del(e); | |
76 delete e; | |
77 pump_ = NULL; | |
78 watcher_ = NULL; | |
79 return (rv == 0); | |
80 } | |
81 | |
82 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) { | |
83 DCHECK(e); | |
84 DCHECK(!event_); | |
85 | |
86 event_ = e; | |
87 } | |
88 | |
89 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { | |
90 struct event *e = event_; | |
91 event_ = NULL; | |
92 return e; | |
93 } | |
94 | |
95 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( | |
96 int fd, MessagePumpLibevent* pump) { | |
97 // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop | |
98 // watching the file descriptor. | |
99 if (!watcher_) | |
100 return; | |
101 pump->WillProcessIOEvent(); | |
102 watcher_->OnFileCanReadWithoutBlocking(fd); | |
103 pump->DidProcessIOEvent(); | |
104 } | |
105 | |
106 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( | |
107 int fd, MessagePumpLibevent* pump) { | |
108 DCHECK(watcher_); | |
109 pump->WillProcessIOEvent(); | |
110 watcher_->OnFileCanWriteWithoutBlocking(fd); | |
111 pump->DidProcessIOEvent(); | |
112 } | |
113 | |
114 MessagePumpLibevent::MessagePumpLibevent() | |
115 : keep_running_(true), | |
116 in_run_(false), | |
117 processed_io_events_(false), | |
118 event_base_(event_base_new()), | |
119 wakeup_pipe_in_(-1), | |
120 wakeup_pipe_out_(-1) { | |
121 if (!Init()) | |
122 NOTREACHED(); | |
123 } | |
124 | |
125 MessagePumpLibevent::~MessagePumpLibevent() { | |
126 DCHECK(wakeup_event_); | |
127 DCHECK(event_base_); | |
128 event_del(wakeup_event_); | |
129 delete wakeup_event_; | |
130 if (wakeup_pipe_in_ >= 0) { | |
131 if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0) | |
132 DPLOG(ERROR) << "close"; | |
133 } | |
134 if (wakeup_pipe_out_ >= 0) { | |
135 if (HANDLE_EINTR(close(wakeup_pipe_out_)) < 0) | |
136 DPLOG(ERROR) << "close"; | |
137 } | |
138 event_base_free(event_base_); | |
139 } | |
140 | |
141 bool MessagePumpLibevent::WatchFileDescriptor(int fd, | |
142 bool persistent, | |
143 int mode, | |
144 FileDescriptorWatcher *controller, | |
145 Watcher *delegate) { | |
146 DCHECK_GE(fd, 0); | |
147 DCHECK(controller); | |
148 DCHECK(delegate); | |
149 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); | |
150 // WatchFileDescriptor should be called on the pump thread. It is not | |
151 // threadsafe, and your watcher may never be registered. | |
152 DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread()); | |
153 | |
154 int event_mask = persistent ? EV_PERSIST : 0; | |
155 if (mode & WATCH_READ) { | |
156 event_mask |= EV_READ; | |
157 } | |
158 if (mode & WATCH_WRITE) { | |
159 event_mask |= EV_WRITE; | |
160 } | |
161 | |
162 scoped_ptr<event> evt(controller->ReleaseEvent()); | |
163 if (evt.get() == NULL) { | |
164 // Ownership is transferred to the controller. | |
165 evt.reset(new event); | |
166 } else { | |
167 // Make sure we don't pick up any funky internal libevent masks. | |
168 int old_interest_mask = evt.get()->ev_events & | |
169 (EV_READ | EV_WRITE | EV_PERSIST); | |
170 | |
171 // Combine old/new event masks. | |
172 event_mask |= old_interest_mask; | |
173 | |
174 // Must disarm the event before we can reuse it. | |
175 event_del(evt.get()); | |
176 | |
177 // It's illegal to use this function to listen on 2 separate fds with the | |
178 // same |controller|. | |
179 if (EVENT_FD(evt.get()) != fd) { | |
180 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd; | |
181 return false; | |
182 } | |
183 } | |
184 | |
185 // Set current interest mask and message pump for this event. | |
186 event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller); | |
187 | |
188 // Tell libevent which message pump this socket will belong to when we add it. | |
189 if (event_base_set(event_base_, evt.get())) { | |
190 return false; | |
191 } | |
192 | |
193 // Add this socket to the list of monitored sockets. | |
194 if (event_add(evt.get(), NULL)) { | |
195 return false; | |
196 } | |
197 | |
198 // Transfer ownership of evt to controller. | |
199 controller->Init(evt.release()); | |
200 | |
201 controller->set_watcher(delegate); | |
202 controller->set_pump(this); | |
203 | |
204 return true; | |
205 } | |
206 | |
207 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { | |
208 io_observers_.AddObserver(obs); | |
209 } | |
210 | |
211 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { | |
212 io_observers_.RemoveObserver(obs); | |
213 } | |
214 | |
215 // Tell libevent to break out of inner loop. | |
216 static void timer_callback(int fd, short events, void *context) | |
217 { | |
218 event_base_loopbreak((struct event_base *)context); | |
219 } | |
220 | |
221 // Reentrant! | |
222 void MessagePumpLibevent::Run(Delegate* delegate) { | |
223 DCHECK(keep_running_) << "Quit must have been called outside of Run!"; | |
224 base::AutoReset<bool> auto_reset_in_run(&in_run_, true); | |
225 | |
226 // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641. | |
227 // Instead, make our own timer and reuse it on each call to event_base_loop(). | |
228 scoped_ptr<event> timer_event(new event); | |
229 | |
230 for (;;) { | |
231 #if defined(OS_MACOSX) | |
232 mac::ScopedNSAutoreleasePool autorelease_pool; | |
233 #endif | |
234 | |
235 bool did_work = delegate->DoWork(); | |
236 if (!keep_running_) | |
237 break; | |
238 | |
239 event_base_loop(event_base_, EVLOOP_NONBLOCK); | |
240 did_work |= processed_io_events_; | |
241 processed_io_events_ = false; | |
242 if (!keep_running_) | |
243 break; | |
244 | |
245 did_work |= delegate->DoDelayedWork(&delayed_work_time_); | |
246 if (!keep_running_) | |
247 break; | |
248 | |
249 if (did_work) | |
250 continue; | |
251 | |
252 did_work = delegate->DoIdleWork(); | |
253 if (!keep_running_) | |
254 break; | |
255 | |
256 if (did_work) | |
257 continue; | |
258 | |
259 // EVLOOP_ONCE tells libevent to only block once, | |
260 // but to service all pending events when it wakes up. | |
261 if (delayed_work_time_.is_null()) { | |
262 event_base_loop(event_base_, EVLOOP_ONCE); | |
263 } else { | |
264 TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); | |
265 if (delay > TimeDelta()) { | |
266 struct timeval poll_tv; | |
267 poll_tv.tv_sec = delay.InSeconds(); | |
268 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; | |
269 event_set(timer_event.get(), -1, 0, timer_callback, event_base_); | |
270 event_base_set(event_base_, timer_event.get()); | |
271 event_add(timer_event.get(), &poll_tv); | |
272 event_base_loop(event_base_, EVLOOP_ONCE); | |
273 event_del(timer_event.get()); | |
274 } else { | |
275 // It looks like delayed_work_time_ indicates a time in the past, so we | |
276 // need to call DoDelayedWork now. | |
277 delayed_work_time_ = TimeTicks(); | |
278 } | |
279 } | |
280 } | |
281 | |
282 keep_running_ = true; | |
283 } | |
284 | |
285 void MessagePumpLibevent::Quit() { | |
286 DCHECK(in_run_); | |
287 // Tell both libevent and Run that they should break out of their loops. | |
288 keep_running_ = false; | |
289 ScheduleWork(); | |
290 } | |
291 | |
292 void MessagePumpLibevent::ScheduleWork() { | |
293 // Tell libevent (in a threadsafe way) that it should break out of its loop. | |
294 char buf = 0; | |
295 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); | |
296 DCHECK(nwrite == 1 || errno == EAGAIN) | |
297 << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; | |
298 } | |
299 | |
300 void MessagePumpLibevent::ScheduleDelayedWork( | |
301 const TimeTicks& delayed_work_time) { | |
302 // We know that we can't be blocked on Wait right now since this method can | |
303 // only be called on the same thread as Run, so we only need to update our | |
304 // record of how long to sleep when we do sleep. | |
305 delayed_work_time_ = delayed_work_time; | |
306 } | |
307 | |
308 void MessagePumpLibevent::WillProcessIOEvent() { | |
309 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); | |
310 } | |
311 | |
312 void MessagePumpLibevent::DidProcessIOEvent() { | |
313 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); | |
314 } | |
315 | |
316 bool MessagePumpLibevent::Init() { | |
317 int fds[2]; | |
318 if (pipe(fds)) { | |
319 DLOG(ERROR) << "pipe() failed, errno: " << errno; | |
320 return false; | |
321 } | |
322 if (SetNonBlocking(fds[0])) { | |
323 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno; | |
324 return false; | |
325 } | |
326 if (SetNonBlocking(fds[1])) { | |
327 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno; | |
328 return false; | |
329 } | |
330 wakeup_pipe_out_ = fds[0]; | |
331 wakeup_pipe_in_ = fds[1]; | |
332 | |
333 wakeup_event_ = new event; | |
334 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, | |
335 OnWakeup, this); | |
336 event_base_set(event_base_, wakeup_event_); | |
337 | |
338 if (event_add(wakeup_event_, 0)) | |
339 return false; | |
340 return true; | |
341 } | |
342 | |
343 // static | |
344 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, | |
345 void* context) { | |
346 base::WeakPtr<FileDescriptorWatcher> controller = | |
347 static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr(); | |
348 DCHECK(controller.get()); | |
349 | |
350 MessagePumpLibevent* pump = controller->pump(); | |
351 pump->processed_io_events_ = true; | |
352 | |
353 if (flags & EV_WRITE) { | |
354 controller->OnFileCanWriteWithoutBlocking(fd, pump); | |
355 } | |
356 // Check |controller| in case it's been deleted in | |
357 // controller->OnFileCanWriteWithoutBlocking(). | |
358 if (controller.get() && flags & EV_READ) { | |
359 controller->OnFileCanReadWithoutBlocking(fd, pump); | |
360 } | |
361 } | |
362 | |
363 // Called if a byte is received on the wakeup pipe. | |
364 // static | |
365 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { | |
366 base::MessagePumpLibevent* that = | |
367 static_cast<base::MessagePumpLibevent*>(context); | |
368 DCHECK(that->wakeup_pipe_out_ == socket); | |
369 | |
370 // Remove and discard the wakeup byte. | |
371 char buf; | |
372 int nread = HANDLE_EINTR(read(socket, &buf, 1)); | |
373 DCHECK_EQ(nread, 1); | |
374 that->processed_io_events_ = true; | |
375 // Tell libevent to break out of inner loop. | |
376 event_base_loopbreak(that->event_base_); | |
377 } | |
378 | |
379 } // namespace base | |
OLD | NEW |