Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(109)

Side by Side Diff: base/message_pump_libevent.cc

Issue 16897006: Move message_pump to base/message_loop. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « base/message_pump_libevent.h ('k') | base/message_pump_libevent_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_pump_libevent.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <unistd.h>
10
11 #include "base/auto_reset.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #if defined(OS_MACOSX)
15 #include "base/mac/scoped_nsautorelease_pool.h"
16 #endif
17 #include "base/memory/scoped_ptr.h"
18 #include "base/observer_list.h"
19 #include "base/posix/eintr_wrapper.h"
20 #include "base/time.h"
21 #include "third_party/libevent/event.h"
22
23 #if defined(OS_MACOSX)
24 #include "base/mac/scoped_nsautorelease_pool.h"
25 #endif
26
27 // Lifecycle of struct event
28 // Libevent uses two main data structures:
29 // struct event_base (of which there is one per message pump), and
30 // struct event (of which there is roughly one per socket).
31 // The socket's struct event is created in
32 // MessagePumpLibevent::WatchFileDescriptor(),
33 // is owned by the FileDescriptorWatcher, and is destroyed in
34 // StopWatchingFileDescriptor().
35 // It is moved into and out of lists in struct event_base by
36 // the libevent functions event_add() and event_del().
37 //
38 // TODO(dkegel):
39 // At the moment bad things happen if a FileDescriptorWatcher
40 // is active after its MessagePumpLibevent has been destroyed.
41 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
42 // Not clear yet whether that situation occurs in practice,
43 // but if it does, we need to fix it.
44
45 namespace base {
46
47 // Return 0 on success
48 // Too small a function to bother putting in a library?
49 static int SetNonBlocking(int fd) {
50 int flags = fcntl(fd, F_GETFL, 0);
51 if (flags == -1)
52 flags = 0;
53 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
54 }
55
56 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
57 : event_(NULL),
58 pump_(NULL),
59 watcher_(NULL),
60 weak_factory_(this) {
61 }
62
63 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
64 if (event_) {
65 StopWatchingFileDescriptor();
66 }
67 }
68
69 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
70 event* e = ReleaseEvent();
71 if (e == NULL)
72 return true;
73
74 // event_del() is a no-op if the event isn't active.
75 int rv = event_del(e);
76 delete e;
77 pump_ = NULL;
78 watcher_ = NULL;
79 return (rv == 0);
80 }
81
82 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e) {
83 DCHECK(e);
84 DCHECK(!event_);
85
86 event_ = e;
87 }
88
89 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
90 struct event *e = event_;
91 event_ = NULL;
92 return e;
93 }
94
95 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
96 int fd, MessagePumpLibevent* pump) {
97 // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
98 // watching the file descriptor.
99 if (!watcher_)
100 return;
101 pump->WillProcessIOEvent();
102 watcher_->OnFileCanReadWithoutBlocking(fd);
103 pump->DidProcessIOEvent();
104 }
105
106 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
107 int fd, MessagePumpLibevent* pump) {
108 DCHECK(watcher_);
109 pump->WillProcessIOEvent();
110 watcher_->OnFileCanWriteWithoutBlocking(fd);
111 pump->DidProcessIOEvent();
112 }
113
114 MessagePumpLibevent::MessagePumpLibevent()
115 : keep_running_(true),
116 in_run_(false),
117 processed_io_events_(false),
118 event_base_(event_base_new()),
119 wakeup_pipe_in_(-1),
120 wakeup_pipe_out_(-1) {
121 if (!Init())
122 NOTREACHED();
123 }
124
125 MessagePumpLibevent::~MessagePumpLibevent() {
126 DCHECK(wakeup_event_);
127 DCHECK(event_base_);
128 event_del(wakeup_event_);
129 delete wakeup_event_;
130 if (wakeup_pipe_in_ >= 0) {
131 if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0)
132 DPLOG(ERROR) << "close";
133 }
134 if (wakeup_pipe_out_ >= 0) {
135 if (HANDLE_EINTR(close(wakeup_pipe_out_)) < 0)
136 DPLOG(ERROR) << "close";
137 }
138 event_base_free(event_base_);
139 }
140
141 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
142 bool persistent,
143 int mode,
144 FileDescriptorWatcher *controller,
145 Watcher *delegate) {
146 DCHECK_GE(fd, 0);
147 DCHECK(controller);
148 DCHECK(delegate);
149 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
150 // WatchFileDescriptor should be called on the pump thread. It is not
151 // threadsafe, and your watcher may never be registered.
152 DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread());
153
154 int event_mask = persistent ? EV_PERSIST : 0;
155 if (mode & WATCH_READ) {
156 event_mask |= EV_READ;
157 }
158 if (mode & WATCH_WRITE) {
159 event_mask |= EV_WRITE;
160 }
161
162 scoped_ptr<event> evt(controller->ReleaseEvent());
163 if (evt.get() == NULL) {
164 // Ownership is transferred to the controller.
165 evt.reset(new event);
166 } else {
167 // Make sure we don't pick up any funky internal libevent masks.
168 int old_interest_mask = evt.get()->ev_events &
169 (EV_READ | EV_WRITE | EV_PERSIST);
170
171 // Combine old/new event masks.
172 event_mask |= old_interest_mask;
173
174 // Must disarm the event before we can reuse it.
175 event_del(evt.get());
176
177 // It's illegal to use this function to listen on 2 separate fds with the
178 // same |controller|.
179 if (EVENT_FD(evt.get()) != fd) {
180 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
181 return false;
182 }
183 }
184
185 // Set current interest mask and message pump for this event.
186 event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
187
188 // Tell libevent which message pump this socket will belong to when we add it.
189 if (event_base_set(event_base_, evt.get())) {
190 return false;
191 }
192
193 // Add this socket to the list of monitored sockets.
194 if (event_add(evt.get(), NULL)) {
195 return false;
196 }
197
198 // Transfer ownership of evt to controller.
199 controller->Init(evt.release());
200
201 controller->set_watcher(delegate);
202 controller->set_pump(this);
203
204 return true;
205 }
206
207 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
208 io_observers_.AddObserver(obs);
209 }
210
211 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
212 io_observers_.RemoveObserver(obs);
213 }
214
215 // Tell libevent to break out of inner loop.
216 static void timer_callback(int fd, short events, void *context)
217 {
218 event_base_loopbreak((struct event_base *)context);
219 }
220
221 // Reentrant!
222 void MessagePumpLibevent::Run(Delegate* delegate) {
223 DCHECK(keep_running_) << "Quit must have been called outside of Run!";
224 base::AutoReset<bool> auto_reset_in_run(&in_run_, true);
225
226 // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
227 // Instead, make our own timer and reuse it on each call to event_base_loop().
228 scoped_ptr<event> timer_event(new event);
229
230 for (;;) {
231 #if defined(OS_MACOSX)
232 mac::ScopedNSAutoreleasePool autorelease_pool;
233 #endif
234
235 bool did_work = delegate->DoWork();
236 if (!keep_running_)
237 break;
238
239 event_base_loop(event_base_, EVLOOP_NONBLOCK);
240 did_work |= processed_io_events_;
241 processed_io_events_ = false;
242 if (!keep_running_)
243 break;
244
245 did_work |= delegate->DoDelayedWork(&delayed_work_time_);
246 if (!keep_running_)
247 break;
248
249 if (did_work)
250 continue;
251
252 did_work = delegate->DoIdleWork();
253 if (!keep_running_)
254 break;
255
256 if (did_work)
257 continue;
258
259 // EVLOOP_ONCE tells libevent to only block once,
260 // but to service all pending events when it wakes up.
261 if (delayed_work_time_.is_null()) {
262 event_base_loop(event_base_, EVLOOP_ONCE);
263 } else {
264 TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
265 if (delay > TimeDelta()) {
266 struct timeval poll_tv;
267 poll_tv.tv_sec = delay.InSeconds();
268 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
269 event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
270 event_base_set(event_base_, timer_event.get());
271 event_add(timer_event.get(), &poll_tv);
272 event_base_loop(event_base_, EVLOOP_ONCE);
273 event_del(timer_event.get());
274 } else {
275 // It looks like delayed_work_time_ indicates a time in the past, so we
276 // need to call DoDelayedWork now.
277 delayed_work_time_ = TimeTicks();
278 }
279 }
280 }
281
282 keep_running_ = true;
283 }
284
285 void MessagePumpLibevent::Quit() {
286 DCHECK(in_run_);
287 // Tell both libevent and Run that they should break out of their loops.
288 keep_running_ = false;
289 ScheduleWork();
290 }
291
292 void MessagePumpLibevent::ScheduleWork() {
293 // Tell libevent (in a threadsafe way) that it should break out of its loop.
294 char buf = 0;
295 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
296 DCHECK(nwrite == 1 || errno == EAGAIN)
297 << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
298 }
299
300 void MessagePumpLibevent::ScheduleDelayedWork(
301 const TimeTicks& delayed_work_time) {
302 // We know that we can't be blocked on Wait right now since this method can
303 // only be called on the same thread as Run, so we only need to update our
304 // record of how long to sleep when we do sleep.
305 delayed_work_time_ = delayed_work_time;
306 }
307
308 void MessagePumpLibevent::WillProcessIOEvent() {
309 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
310 }
311
312 void MessagePumpLibevent::DidProcessIOEvent() {
313 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
314 }
315
316 bool MessagePumpLibevent::Init() {
317 int fds[2];
318 if (pipe(fds)) {
319 DLOG(ERROR) << "pipe() failed, errno: " << errno;
320 return false;
321 }
322 if (SetNonBlocking(fds[0])) {
323 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
324 return false;
325 }
326 if (SetNonBlocking(fds[1])) {
327 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
328 return false;
329 }
330 wakeup_pipe_out_ = fds[0];
331 wakeup_pipe_in_ = fds[1];
332
333 wakeup_event_ = new event;
334 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
335 OnWakeup, this);
336 event_base_set(event_base_, wakeup_event_);
337
338 if (event_add(wakeup_event_, 0))
339 return false;
340 return true;
341 }
342
343 // static
344 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
345 void* context) {
346 base::WeakPtr<FileDescriptorWatcher> controller =
347 static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr();
348 DCHECK(controller.get());
349
350 MessagePumpLibevent* pump = controller->pump();
351 pump->processed_io_events_ = true;
352
353 if (flags & EV_WRITE) {
354 controller->OnFileCanWriteWithoutBlocking(fd, pump);
355 }
356 // Check |controller| in case it's been deleted in
357 // controller->OnFileCanWriteWithoutBlocking().
358 if (controller.get() && flags & EV_READ) {
359 controller->OnFileCanReadWithoutBlocking(fd, pump);
360 }
361 }
362
363 // Called if a byte is received on the wakeup pipe.
364 // static
365 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
366 base::MessagePumpLibevent* that =
367 static_cast<base::MessagePumpLibevent*>(context);
368 DCHECK(that->wakeup_pipe_out_ == socket);
369
370 // Remove and discard the wakeup byte.
371 char buf;
372 int nread = HANDLE_EINTR(read(socket, &buf, 1));
373 DCHECK_EQ(nread, 1);
374 that->processed_io_events_ = true;
375 // Tell libevent to break out of inner loop.
376 event_base_loopbreak(that->event_base_);
377 }
378
379 } // namespace base
OLDNEW
« no previous file with comments | « base/message_pump_libevent.h ('k') | base/message_pump_libevent_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698