OLD | NEW |
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/message_pump_libevent.h" | 5 #include "base/message_pump_libevent.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <fcntl.h> | 8 #include <fcntl.h> |
9 | 9 |
10 #include "eintr_wrapper.h" | |
11 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
| 11 #include "base/eintr_wrapper.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/observer_list.h" |
13 #include "base/scoped_nsautorelease_pool.h" | 14 #include "base/scoped_nsautorelease_pool.h" |
14 #include "base/scoped_ptr.h" | 15 #include "base/scoped_ptr.h" |
15 #include "base/time.h" | 16 #include "base/time.h" |
16 #if defined(USE_SYSTEM_LIBEVENT) | 17 #if defined(USE_SYSTEM_LIBEVENT) |
17 #include <event.h> | 18 #include <event.h> |
18 #else | 19 #else |
19 #include "third_party/libevent/event.h" | 20 #include "third_party/libevent/event.h" |
20 #endif | 21 #endif |
21 | 22 |
22 // Lifecycle of struct event | 23 // Lifecycle of struct event |
(...skipping 20 matching lines...) Expand all Loading... |
43 // Too small a function to bother putting in a library? | 44 // Too small a function to bother putting in a library? |
44 static int SetNonBlocking(int fd) { | 45 static int SetNonBlocking(int fd) { |
45 int flags = fcntl(fd, F_GETFL, 0); | 46 int flags = fcntl(fd, F_GETFL, 0); |
46 if (flags == -1) | 47 if (flags == -1) |
47 flags = 0; | 48 flags = 0; |
48 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); | 49 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
49 } | 50 } |
50 | 51 |
51 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() | 52 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() |
52 : is_persistent_(false), | 53 : is_persistent_(false), |
53 event_(NULL) { | 54 event_(NULL), |
| 55 pump_(NULL), |
| 56 watcher_(NULL) { |
54 } | 57 } |
55 | 58 |
56 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { | 59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { |
57 if (event_) { | 60 if (event_) { |
58 StopWatchingFileDescriptor(); | 61 StopWatchingFileDescriptor(); |
59 } | 62 } |
60 } | 63 } |
61 | 64 |
62 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, | 65 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, |
63 bool is_persistent) { | 66 bool is_persistent) { |
(...skipping 11 matching lines...) Expand all Loading... |
75 } | 78 } |
76 | 79 |
77 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { | 80 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { |
78 event* e = ReleaseEvent(); | 81 event* e = ReleaseEvent(); |
79 if (e == NULL) | 82 if (e == NULL) |
80 return true; | 83 return true; |
81 | 84 |
82 // event_del() is a no-op if the event isn't active. | 85 // event_del() is a no-op if the event isn't active. |
83 int rv = event_del(e); | 86 int rv = event_del(e); |
84 delete e; | 87 delete e; |
| 88 pump_ = NULL; |
| 89 watcher_ = NULL; |
85 return (rv == 0); | 90 return (rv == 0); |
86 } | 91 } |
87 | 92 |
| 93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking( |
| 94 int fd, MessagePumpLibevent* pump) { |
| 95 pump->WillProcessIOEvent(); |
| 96 watcher_->OnFileCanReadWithoutBlocking(fd); |
| 97 pump->DidProcessIOEvent(); |
| 98 } |
| 99 |
| 100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking( |
| 101 int fd, MessagePumpLibevent* pump) { |
| 102 pump->WillProcessIOEvent(); |
| 103 watcher_->OnFileCanWriteWithoutBlocking(fd); |
| 104 pump->DidProcessIOEvent(); |
| 105 } |
| 106 |
88 // Called if a byte is received on the wakeup pipe. | 107 // Called if a byte is received on the wakeup pipe. |
89 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { | 108 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { |
90 base::MessagePumpLibevent* that = | 109 base::MessagePumpLibevent* that = |
91 static_cast<base::MessagePumpLibevent*>(context); | 110 static_cast<base::MessagePumpLibevent*>(context); |
92 DCHECK(that->wakeup_pipe_out_ == socket); | 111 DCHECK(that->wakeup_pipe_out_ == socket); |
93 | 112 |
94 // Remove and discard the wakeup byte. | 113 // Remove and discard the wakeup byte. |
95 char buf; | 114 char buf; |
96 int nread = HANDLE_EINTR(read(socket, &buf, 1)); | 115 int nread = HANDLE_EINTR(read(socket, &buf, 1)); |
97 DCHECK_EQ(nread, 1); | 116 DCHECK_EQ(nread, 1); |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
135 return false; | 154 return false; |
136 return true; | 155 return true; |
137 } | 156 } |
138 | 157 |
139 MessagePumpLibevent::~MessagePumpLibevent() { | 158 MessagePumpLibevent::~MessagePumpLibevent() { |
140 DCHECK(wakeup_event_); | 159 DCHECK(wakeup_event_); |
141 DCHECK(event_base_); | 160 DCHECK(event_base_); |
142 event_del(wakeup_event_); | 161 event_del(wakeup_event_); |
143 delete wakeup_event_; | 162 delete wakeup_event_; |
144 if (wakeup_pipe_in_ >= 0) | 163 if (wakeup_pipe_in_ >= 0) |
145 close(wakeup_pipe_in_); | 164 HANDLE_EINTR(close(wakeup_pipe_in_)); |
146 if (wakeup_pipe_out_ >= 0) | 165 if (wakeup_pipe_out_ >= 0) |
147 close(wakeup_pipe_out_); | 166 HANDLE_EINTR(close(wakeup_pipe_out_)); |
148 event_base_free(event_base_); | 167 event_base_free(event_base_); |
149 } | 168 } |
150 | 169 |
151 bool MessagePumpLibevent::WatchFileDescriptor(int fd, | 170 bool MessagePumpLibevent::WatchFileDescriptor(int fd, |
152 bool persistent, | 171 bool persistent, |
153 Mode mode, | 172 Mode mode, |
154 FileDescriptorWatcher *controller, | 173 FileDescriptorWatcher *controller, |
155 Watcher *delegate) { | 174 Watcher *delegate) { |
156 DCHECK_GE(fd, 0); | 175 DCHECK_GE(fd, 0); |
157 DCHECK(controller); | 176 DCHECK(controller); |
(...skipping 25 matching lines...) Expand all Loading... |
183 | 202 |
184 // It's illegal to use this function to listen on 2 separate fds with the | 203 // It's illegal to use this function to listen on 2 separate fds with the |
185 // same |controller|. | 204 // same |controller|. |
186 if (EVENT_FD(evt.get()) != fd) { | 205 if (EVENT_FD(evt.get()) != fd) { |
187 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd; | 206 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd; |
188 return false; | 207 return false; |
189 } | 208 } |
190 } | 209 } |
191 | 210 |
192 // Set current interest mask and message pump for this event. | 211 // Set current interest mask and message pump for this event. |
193 event_set(evt.get(), fd, event_mask, OnLibeventNotification, delegate); | 212 event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller); |
194 | 213 |
195 // Tell libevent which message pump this socket will belong to when we add it. | 214 // Tell libevent which message pump this socket will belong to when we add it. |
196 if (event_base_set(event_base_, evt.get()) != 0) { | 215 if (event_base_set(event_base_, evt.get()) != 0) { |
197 return false; | 216 return false; |
198 } | 217 } |
199 | 218 |
200 // Add this socket to the list of monitored sockets. | 219 // Add this socket to the list of monitored sockets. |
201 if (event_add(evt.get(), NULL) != 0) { | 220 if (event_add(evt.get(), NULL) != 0) { |
202 return false; | 221 return false; |
203 } | 222 } |
204 | 223 |
205 // Transfer ownership of evt to controller. | 224 // Transfer ownership of evt to controller. |
206 controller->Init(evt.release(), persistent); | 225 controller->Init(evt.release(), persistent); |
| 226 |
| 227 controller->set_watcher(delegate); |
| 228 controller->set_pump(this); |
| 229 |
207 return true; | 230 return true; |
208 } | 231 } |
209 | 232 |
210 | |
211 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, | 233 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, |
212 void* context) { | 234 void* context) { |
213 Watcher* watcher = static_cast<Watcher*>(context); | 235 FileDescriptorWatcher* controller = |
| 236 static_cast<FileDescriptorWatcher*>(context); |
| 237 |
| 238 MessagePumpLibevent* pump = controller->pump(); |
214 | 239 |
215 if (flags & EV_WRITE) { | 240 if (flags & EV_WRITE) { |
216 watcher->OnFileCanWriteWithoutBlocking(fd); | 241 controller->OnFileCanWriteWithoutBlocking(fd, pump); |
217 } | 242 } |
218 if (flags & EV_READ) { | 243 if (flags & EV_READ) { |
219 watcher->OnFileCanReadWithoutBlocking(fd); | 244 controller->OnFileCanReadWithoutBlocking(fd, pump); |
220 } | 245 } |
221 } | 246 } |
222 | 247 |
223 // Tell libevent to break out of inner loop. | 248 // Tell libevent to break out of inner loop. |
224 static void timer_callback(int fd, short events, void *context) | 249 static void timer_callback(int fd, short events, void *context) |
225 { | 250 { |
226 event_base_loopbreak((struct event_base *)context); | 251 event_base_loopbreak((struct event_base *)context); |
227 } | 252 } |
228 | 253 |
229 // Reentrant! | 254 // Reentrant! |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
297 << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; | 322 << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; |
298 } | 323 } |
299 | 324 |
300 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { | 325 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { |
301 // We know that we can't be blocked on Wait right now since this method can | 326 // We know that we can't be blocked on Wait right now since this method can |
302 // only be called on the same thread as Run, so we only need to update our | 327 // only be called on the same thread as Run, so we only need to update our |
303 // record of how long to sleep when we do sleep. | 328 // record of how long to sleep when we do sleep. |
304 delayed_work_time_ = delayed_work_time; | 329 delayed_work_time_ = delayed_work_time; |
305 } | 330 } |
306 | 331 |
| 332 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) { |
| 333 io_observers_.AddObserver(obs); |
| 334 } |
| 335 |
| 336 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) { |
| 337 io_observers_.RemoveObserver(obs); |
| 338 } |
| 339 |
| 340 void MessagePumpLibevent::WillProcessIOEvent() { |
| 341 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent()); |
| 342 } |
| 343 |
| 344 void MessagePumpLibevent::DidProcessIOEvent() { |
| 345 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent()); |
| 346 } |
| 347 |
307 } // namespace base | 348 } // namespace base |
OLD | NEW |