OLD | NEW |
1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2008 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/message_pump_libevent.h" | 5 #include "base/message_pump_libevent.h" |
6 | 6 |
7 #include <fcntl.h> | 7 #include <fcntl.h> |
8 | 8 |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/scoped_nsautorelease_pool.h" | 10 #include "base/scoped_nsautorelease_pool.h" |
11 #include "base/time.h" | 11 #include "base/time.h" |
12 #include "third_party/libevent/event.h" | 12 #include "third_party/libevent/event.h" |
13 | 13 |
14 namespace base { | 14 namespace base { |
15 | 15 |
16 // Return 0 on success | 16 // Return 0 on success |
17 // Too small a function to bother putting in a library? | 17 // Too small a function to bother putting in a library? |
18 static int SetNonBlocking(int fd) | 18 static int SetNonBlocking(int fd) { |
19 { | 19 int flags = fcntl(fd, F_GETFL, 0); |
20 int flags = fcntl(fd, F_GETFL, 0); | 20 if (flags == -1) |
21 if (-1 == flags) | 21 flags = 0; |
22 flags = 0; | 22 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
23 return fcntl(fd, F_SETFL, flags | O_NONBLOCK); | 23 } |
| 24 |
| 25 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher() |
| 26 : is_persistent_(false), |
| 27 event_(NULL) { |
| 28 } |
| 29 |
| 30 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() { |
| 31 if (event_.get()) { |
| 32 StopWatchingFileDescriptor(); |
| 33 } |
| 34 } |
| 35 |
| 36 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e, |
| 37 bool is_persistent) { |
| 38 DCHECK(e); |
| 39 DCHECK(event_.get() == NULL); |
| 40 |
| 41 is_persistent_ = is_persistent; |
| 42 event_.reset(e); |
| 43 } |
| 44 |
| 45 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() { |
| 46 return event_.release(); |
| 47 } |
| 48 |
| 49 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() { |
| 50 if (event_.get() == NULL) { |
| 51 return true; |
| 52 } |
| 53 |
| 54 // event_del() is a no-op of the event isn't active. |
| 55 return (event_del(event_.get()) == 0); |
24 } | 56 } |
25 | 57 |
26 // Called if a byte is received on the wakeup pipe. | 58 // Called if a byte is received on the wakeup pipe. |
27 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { | 59 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { |
28 | |
29 base::MessagePumpLibevent* that = | 60 base::MessagePumpLibevent* that = |
30 static_cast<base::MessagePumpLibevent*>(context); | 61 static_cast<base::MessagePumpLibevent*>(context); |
31 DCHECK(that->wakeup_pipe_out_ == socket); | 62 DCHECK(that->wakeup_pipe_out_ == socket); |
32 | 63 |
33 // Remove and discard the wakeup byte. | 64 // Remove and discard the wakeup byte. |
34 char buf; | 65 char buf; |
35 int nread = read(socket, &buf, 1); | 66 int nread = read(socket, &buf, 1); |
36 DCHECK(nread == 1); | 67 DCHECK(nread == 1); |
37 // Tell libevent to break out of inner loop. | 68 // Tell libevent to break out of inner loop. |
38 event_base_loopbreak(that->event_base_); | 69 event_base_loopbreak(that->event_base_); |
(...skipping 15 matching lines...) Expand all Loading... |
54 return false; | 85 return false; |
55 if (SetNonBlocking(fds[0])) | 86 if (SetNonBlocking(fds[0])) |
56 return false; | 87 return false; |
57 if (SetNonBlocking(fds[1])) | 88 if (SetNonBlocking(fds[1])) |
58 return false; | 89 return false; |
59 wakeup_pipe_out_ = fds[0]; | 90 wakeup_pipe_out_ = fds[0]; |
60 wakeup_pipe_in_ = fds[1]; | 91 wakeup_pipe_in_ = fds[1]; |
61 | 92 |
62 wakeup_event_ = new event; | 93 wakeup_event_ = new event; |
63 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, | 94 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, |
64 » OnWakeup, this); | 95 OnWakeup, this); |
65 event_base_set(event_base_, wakeup_event_); | 96 event_base_set(event_base_, wakeup_event_); |
66 | 97 |
67 if (event_add(wakeup_event_, 0)) | 98 if (event_add(wakeup_event_, 0)) |
68 return false; | 99 return false; |
69 return true; | 100 return true; |
70 } | 101 } |
71 | 102 |
72 MessagePumpLibevent::~MessagePumpLibevent() { | 103 MessagePumpLibevent::~MessagePumpLibevent() { |
73 DCHECK(wakeup_event_); | 104 DCHECK(wakeup_event_); |
74 DCHECK(event_base_); | 105 DCHECK(event_base_); |
75 event_del(wakeup_event_); | 106 event_del(wakeup_event_); |
76 delete wakeup_event_; | 107 delete wakeup_event_; |
77 event_base_free(event_base_); | 108 event_base_free(event_base_); |
78 } | 109 } |
79 | 110 |
80 void MessagePumpLibevent::WatchSocket(int socket, short interest_mask, | 111 bool MessagePumpLibevent::WatchFileDescriptor(int fd, |
81 event* e, Watcher* watcher) { | 112 bool persistent, |
| 113 Mode mode, |
| 114 FileDescriptorWatcher *controller, |
| 115 Watcher *delegate) { |
| 116 DCHECK(fd > 0); |
| 117 DCHECK(controller); |
| 118 DCHECK(delegate); |
| 119 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); |
82 | 120 |
83 // Set current interest mask and message pump for this event | 121 int event_mask = persistent ? EV_PERSIST : 0; |
84 event_set(e, socket, interest_mask, OnReadinessNotification, watcher); | 122 if ((mode & WATCH_READ) != 0) { |
| 123 event_mask |= EV_READ; |
| 124 } |
| 125 if ((mode & WATCH_WRITE) != 0) { |
| 126 event_mask |= EV_WRITE; |
| 127 } |
| 128 |
| 129 // |should_delete_event| is true if we're modifying an event that's currently |
| 130 // active in |controller|. |
| 131 // If we're modifying an existing event and there's an error then we need to |
| 132 // tell libevent to clean it up via event_delete() before returning. |
| 133 bool should_delete_event = true; |
| 134 scoped_ptr<event> evt(controller->ReleaseEvent()); |
| 135 if (evt.get() == NULL) { |
| 136 should_delete_event = false; |
| 137 // Ownership is transferred to the controller. |
| 138 evt.reset(new event); |
| 139 } |
| 140 |
| 141 // Set current interest mask and message pump for this event. |
| 142 event_set(evt.get(), fd, event_mask, OnLibeventNotification, |
| 143 delegate); |
85 | 144 |
86 // Tell libevent which message pump this socket will belong to when we add it. | 145 // Tell libevent which message pump this socket will belong to when we add it. |
87 event_base_set(event_base_, e); | 146 if (event_base_set(event_base_, evt.get()) != 0) { |
| 147 if (should_delete_event) { |
| 148 event_del(evt.get()); |
| 149 } |
| 150 return false; |
| 151 } |
88 | 152 |
89 // Add this socket to the list of monitored sockets. | 153 // Add this socket to the list of monitored sockets. |
90 if (event_add(e, NULL)) | 154 if (event_add(evt.get(), NULL) != 0) { |
91 NOTREACHED(); | 155 if (should_delete_event) { |
| 156 event_del(evt.get()); |
| 157 } |
| 158 return false; |
| 159 } |
| 160 |
| 161 // Transfer ownership of e to controller. |
| 162 controller->Init(evt.release(), persistent); |
| 163 return true; |
92 } | 164 } |
93 | 165 |
94 void MessagePumpLibevent::WatchFileHandle(int fd, short interest_mask, | 166 |
95 event* e, FileWatcher* watcher) { | 167 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags, |
96 // Set current interest mask and message pump for this event | 168 void* context) { |
97 if ((interest_mask & EV_READ) != 0) { | 169 Watcher* watcher = static_cast<Watcher*>(context); |
98 event_set(e, fd, interest_mask, OnFileReadReadinessNotification, watcher); | 170 |
99 } else { | 171 if (flags & EV_WRITE) { |
100 event_set(e, fd, interest_mask, OnFileWriteReadinessNotification, watcher); | 172 watcher->OnFileCanWriteWithoutBlocking(fd); |
101 } | 173 } |
102 | 174 if (flags & EV_READ) { |
103 // Tell libevent which message pump this fd will belong to when we add it. | 175 watcher->OnFileCanReadWithoutBlocking(fd); |
104 event_base_set(event_base_, e); | 176 } |
105 | |
106 // Add this fd to the list of monitored sockets. | |
107 if (event_add(e, NULL)) | |
108 NOTREACHED(); | |
109 } | |
110 | |
111 void MessagePumpLibevent::UnwatchSocket(event* e) { | |
112 // Remove this socket from the list of monitored sockets. | |
113 if (event_del(e)) | |
114 NOTREACHED(); | |
115 } | |
116 | |
117 void MessagePumpLibevent::UnwatchFileHandle(event* e) { | |
118 // Remove this fd from the list of monitored fds. | |
119 if (event_del(e)) | |
120 NOTREACHED(); | |
121 } | |
122 | |
123 void MessagePumpLibevent::OnReadinessNotification(int socket, short flags, | |
124 void* context) { | |
125 // The given socket is ready for I/O. | |
126 // Tell the owner what kind of I/O the socket is ready for. | |
127 Watcher* watcher = static_cast<Watcher*>(context); | |
128 watcher->OnSocketReady(flags); | |
129 } | |
130 | |
131 void MessagePumpLibevent::OnFileReadReadinessNotification(int fd, short flags, | |
132 void* context) { | |
133 FileWatcher* watcher = static_cast<FileWatcher*>(context); | |
134 watcher->OnFileReadReady(fd); | |
135 } | |
136 | |
137 void MessagePumpLibevent::OnFileWriteReadinessNotification(int fd, short flags, | |
138 void* context) { | |
139 FileWatcher* watcher = static_cast<FileWatcher*>(context); | |
140 watcher->OnFileWriteReady(fd); | |
141 } | 177 } |
142 | 178 |
143 // Reentrant! | 179 // Reentrant! |
144 void MessagePumpLibevent::Run(Delegate* delegate) { | 180 void MessagePumpLibevent::Run(Delegate* delegate) { |
145 DCHECK(keep_running_) << "Quit must have been called outside of Run!"; | 181 DCHECK(keep_running_) << "Quit must have been called outside of Run!"; |
146 | 182 |
147 bool old_in_run = in_run_; | 183 bool old_in_run = in_run_; |
148 in_run_ = true; | 184 in_run_ = true; |
149 | 185 |
150 for (;;) { | 186 for (;;) { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
207 } | 243 } |
208 | 244 |
209 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { | 245 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) { |
210 // We know that we can't be blocked on Wait right now since this method can | 246 // We know that we can't be blocked on Wait right now since this method can |
211 // only be called on the same thread as Run, so we only need to update our | 247 // only be called on the same thread as Run, so we only need to update our |
212 // record of how long to sleep when we do sleep. | 248 // record of how long to sleep when we do sleep. |
213 delayed_work_time_ = delayed_work_time; | 249 delayed_work_time_ = delayed_work_time; |
214 } | 250 } |
215 | 251 |
216 } // namespace base | 252 } // namespace base |
OLD | NEW |