Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(516)

Side by Side Diff: webrtc/rtc_base/task_queue_libevent.cc

Issue 2936213003: Test using a global, replacable TaskQueueImpl factory.
Patch Set: Added global factory. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/rtc_base/task_queue.h" 11 #include "webrtc/rtc_base/task_queue.h"
12 12
13 #include <fcntl.h> 13 #include <fcntl.h>
14 #include <signal.h> 14 #include <signal.h>
15 #include <string.h> 15 #include <string.h>
16 #include <unistd.h> 16 #include <unistd.h>
17 17
18 #include "base/third_party/libevent/event.h" 18 #include "base/third_party/libevent/event.h"
19 #include "webrtc/rtc_base/checks.h" 19 #include "webrtc/rtc_base/checks.h"
20 #include "webrtc/rtc_base/logging.h" 20 #include "webrtc/rtc_base/logging.h"
21 #include "webrtc/rtc_base/platform_thread.h"
22
23 #include "webrtc/rtc_base/refcountedobject.h"
21 #include "webrtc/rtc_base/safe_conversions.h" 24 #include "webrtc/rtc_base/safe_conversions.h"
25 #include "webrtc/rtc_base/task_queue_impl.h"
26 #include "webrtc/rtc_base/task_queue_impl_factory.h"
22 #include "webrtc/rtc_base/task_queue_posix.h" 27 #include "webrtc/rtc_base/task_queue_posix.h"
23 #include "webrtc/rtc_base/timeutils.h" 28 #include "webrtc/rtc_base/timeutils.h"
24 29
25 namespace rtc { 30 namespace rtc {
26 using internal::GetQueuePtrTls; 31 using internal::GetQueuePtrTls;
27 using internal::AutoSetCurrentQueuePtr; 32 using internal::AutoSetCurrentQueuePtr;
28 33
29 namespace { 34 namespace {
30 static const char kQuit = 1; 35 static const char kQuit = 1;
31 static const char kRunTask = 2; 36 static const char kRunTask = 2;
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 case Priority::NORMAL: 102 case Priority::NORMAL:
98 return kNormalPriority; 103 return kNormalPriority;
99 default: 104 default:
100 RTC_NOTREACHED(); 105 RTC_NOTREACHED();
101 break; 106 break;
102 } 107 }
103 return kNormalPriority; 108 return kNormalPriority;
104 } 109 }
105 } // namespace 110 } // namespace
106 111
107 struct TaskQueue::QueueContext { 112 class TaskQueueLibEvent : public TaskQueueImpl {
108 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} 113 public:
109 TaskQueue* queue; 114 explicit TaskQueueLibEvent(const char* queue_name,
115 TaskQueue* queue,
116 Priority priority);
117 ~TaskQueueLibEvent() override;
118
119 static TaskQueueLibEvent* Current();
120 static TaskQueue* CurrentQueue();
121
122 // Used for DCHECKing the current queue.
123 static bool IsCurrent(const char* queue_name);
124 bool IsCurrent() const override;
125
126 void PostTask(std::unique_ptr<QueuedTask> task) override;
127 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
128 std::unique_ptr<QueuedTask> reply,
129 TaskQueueImpl* reply_queue) override;
130
131 void PostDelayedTask(std::unique_ptr<QueuedTask> task,
132 uint32_t milliseconds) override;
133
134 private:
135 static void ThreadMain(void* context);
136 static void OnWakeup(int socket, short flags, void* context); // NOLINT
137 static void RunTask(int fd, short flags, void* context); // NOLINT
138 static void RunTimer(int fd, short flags, void* context); // NOLINT
139
140 class ReplyTaskOwner;
141 class PostAndReplyTask;
142 class SetTimerTask;
143
144 typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
145
146 void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
147
148 struct QueueContext;
149 TaskQueue* const queue_;
150 int wakeup_pipe_in_ = -1;
151 int wakeup_pipe_out_ = -1;
152 event_base* event_base_;
153 std::unique_ptr<event> wakeup_event_;
154 PlatformThread thread_;
155 rtc::CriticalSection pending_lock_;
156 std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
157 std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
158 GUARDED_BY(pending_lock_);
159 };
160
161 scoped_refptr<TaskQueueImpl> TaskQueueLibEventFactory::CreateImpl(
162 const char* queue_name,
163 TaskQueue* queue, // only used for GetCurrent()
164 TaskQueue::Priority priority) {
165 return new RefCountedObject<TaskQueueLibEvent>(queue_name, queue, priority);
166 }
167
168 TaskQueue* TaskQueueLibEventFactory::CurrentQueue() {
169 return TaskQueueLibEvent::CurrentQueue();
170 }
171
172 struct TaskQueueLibEvent::QueueContext {
173 explicit QueueContext(TaskQueueLibEvent* q) : queue(q), is_active(true) {}
174 TaskQueueLibEvent* queue;
110 bool is_active; 175 bool is_active;
111 // Holds a list of events pending timers for cleanup when the loop exits. 176 // Holds a list of events pending timers for cleanup when the loop exits.
112 std::list<TimerEvent*> pending_timers_; 177 std::list<TimerEvent*> pending_timers_;
113 }; 178 };
114 179
115 // Posting a reply task is tricky business. This class owns the reply task 180 // Posting a reply task is tricky business. This class owns the reply task
116 // and a reference to it is held by both the reply queue and the first task. 181 // and a reference to it is held by both the reply queue and the first task.
117 // Here's an outline of what happens when dealing with a reply task. 182 // Here's an outline of what happens when dealing with a reply task.
118 // * The ReplyTaskOwner owns the |reply_| task. 183 // * The ReplyTaskOwner owns the |reply_| task.
119 // * One ref owned by PostAndReplyTask 184 // * One ref owned by PostAndReplyTask
120 // * One ref owned by the reply TaskQueue 185 // * One ref owned by the reply TaskQueue
121 // * ReplyTaskOwner has a flag |run_task_| initially set to false. 186 // * ReplyTaskOwner has a flag |run_task_| initially set to false.
122 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). 187 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
123 // * After successfully running the original |task_|, PostAndReplyTask() calls 188 // * After successfully running the original |task_|, PostAndReplyTask() calls
124 // set_should_run_task(). This sets |run_task_| to true. 189 // set_should_run_task(). This sets |run_task_| to true.
125 // * In PostAndReplyTask's dtor: 190 // * In PostAndReplyTask's dtor:
126 // * It releases its reference to ReplyTaskOwner (important to do this first). 191 // * It releases its reference to ReplyTaskOwner (important to do this first).
127 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe. 192 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
128 // * PostAndReplyTask doesn't care if write() fails, but when it does: 193 // * PostAndReplyTask doesn't care if write() fails, but when it does:
129 // * The reply queue is gone. 194 // * The reply queue is gone.
130 // * ReplyTaskOwner has already been deleted and the reply task too. 195 // * ReplyTaskOwner has already been deleted and the reply task too.
131 // * If write() succeeds: 196 // * If write() succeeds:
132 // * ReplyQueue receives the kRunReplyTask message 197 // * ReplyQueue receives the kRunReplyTask message
133 // * Goes through all pending tasks, finding the first that HasOneRef() 198 // * Goes through all pending tasks, finding the first that HasOneRef()
134 // * Calls ReplyTaskOwner::Run() 199 // * Calls ReplyTaskOwner::Run()
135 // * if set_should_run_task() was called, the reply task will be run 200 // * if set_should_run_task() was called, the reply task will be run
136 // * Release the reference to ReplyTaskOwner 201 // * Release the reference to ReplyTaskOwner
137 // * ReplyTaskOwner and associated |reply_| are deleted. 202 // * ReplyTaskOwner and associated |reply_| are deleted.
138 class TaskQueue::ReplyTaskOwner { 203 class TaskQueueLibEvent::ReplyTaskOwner {
139 public: 204 public:
140 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) 205 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
141 : reply_(std::move(reply)) {} 206 : reply_(std::move(reply)) {}
142 207
143 void Run() { 208 void Run() {
144 RTC_DCHECK(reply_); 209 RTC_DCHECK(reply_);
145 if (run_task_) { 210 if (run_task_) {
146 if (!reply_->Run()) 211 if (!reply_->Run())
147 reply_.release(); 212 reply_.release();
148 } 213 }
149 reply_.reset(); 214 reply_.reset();
150 } 215 }
151 216
152 void set_should_run_task() { 217 void set_should_run_task() {
153 RTC_DCHECK(!run_task_); 218 RTC_DCHECK(!run_task_);
154 run_task_ = true; 219 run_task_ = true;
155 } 220 }
156 221
157 private: 222 private:
158 std::unique_ptr<QueuedTask> reply_; 223 std::unique_ptr<QueuedTask> reply_;
159 bool run_task_ = false; 224 bool run_task_ = false;
160 }; 225 };
161 226
162 class TaskQueue::PostAndReplyTask : public QueuedTask { 227 class TaskQueueLibEvent::PostAndReplyTask : public QueuedTask {
163 public: 228 public:
164 PostAndReplyTask(std::unique_ptr<QueuedTask> task, 229 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
165 std::unique_ptr<QueuedTask> reply, 230 std::unique_ptr<QueuedTask> reply,
166 TaskQueue* reply_queue, 231 TaskQueueLibEvent* reply_queue,
167 int reply_pipe) 232 int reply_pipe)
168 : task_(std::move(task)), 233 : task_(std::move(task)),
169 reply_pipe_(reply_pipe), 234 reply_pipe_(reply_pipe),
170 reply_task_owner_( 235 reply_task_owner_(
171 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) { 236 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
172 reply_queue->PrepareReplyTask(reply_task_owner_); 237 reply_queue->PrepareReplyTask(reply_task_owner_);
173 } 238 }
174 239
175 ~PostAndReplyTask() override { 240 ~PostAndReplyTask() override {
176 reply_task_owner_ = nullptr; 241 reply_task_owner_ = nullptr;
(...skipping 12 matching lines...) Expand all
189 task_.release(); 254 task_.release();
190 reply_task_owner_->set_should_run_task(); 255 reply_task_owner_->set_should_run_task();
191 return true; 256 return true;
192 } 257 }
193 258
194 std::unique_ptr<QueuedTask> task_; 259 std::unique_ptr<QueuedTask> task_;
195 int reply_pipe_; 260 int reply_pipe_;
196 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; 261 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
197 }; 262 };
198 263
199 class TaskQueue::SetTimerTask : public QueuedTask { 264 class TaskQueueLibEvent::SetTimerTask : public QueuedTask {
200 public: 265 public:
201 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) 266 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
202 : task_(std::move(task)), 267 : task_(std::move(task)),
203 milliseconds_(milliseconds), 268 milliseconds_(milliseconds),
204 posted_(Time32()) {} 269 posted_(Time32()) {}
205 270
206 private: 271 private:
207 bool Run() override { 272 bool Run() override {
208 // Compensate for the time that has passed since construction 273 // Compensate for the time that has passed since construction
209 // and until we got here. 274 // and until we got here.
210 uint32_t post_time = Time32() - posted_; 275 uint32_t post_time = Time32() - posted_;
211 TaskQueue::Current()->PostDelayedTask( 276 TaskQueueLibEvent::Current()->PostDelayedTask(
212 std::move(task_), 277 std::move(task_),
213 post_time > milliseconds_ ? 0 : milliseconds_ - post_time); 278 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
214 return true; 279 return true;
215 } 280 }
216 281
217 std::unique_ptr<QueuedTask> task_; 282 std::unique_ptr<QueuedTask> task_;
218 const uint32_t milliseconds_; 283 const uint32_t milliseconds_;
219 const uint32_t posted_; 284 const uint32_t posted_;
220 }; 285 };
221 286
222 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) 287 TaskQueueLibEvent::TaskQueueLibEvent(const char* queue_name,
223 : event_base_(event_base_new()), 288 TaskQueue* queue,
289 Priority priority /*= NORMAL*/)
290 : queue_(queue),
291 event_base_(event_base_new()),
224 wakeup_event_(new event()), 292 wakeup_event_(new event()),
225 thread_(&TaskQueue::ThreadMain, 293 thread_(&TaskQueueLibEvent::ThreadMain,
226 this, 294 this,
227 queue_name, 295 queue_name,
228 TaskQueuePriorityToThreadPriority(priority)) { 296 TaskQueuePriorityToThreadPriority(priority)) {
229 RTC_DCHECK(queue_name); 297 RTC_DCHECK(queue_name);
230 int fds[2]; 298 int fds[2];
231 RTC_CHECK(pipe(fds) == 0); 299 RTC_CHECK(pipe(fds) == 0);
232 SetNonBlocking(fds[0]); 300 SetNonBlocking(fds[0]);
233 SetNonBlocking(fds[1]); 301 SetNonBlocking(fds[1]);
234 wakeup_pipe_out_ = fds[0]; 302 wakeup_pipe_out_ = fds[0];
235 wakeup_pipe_in_ = fds[1]; 303 wakeup_pipe_in_ = fds[1];
236 304
237 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, 305 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
238 EV_READ | EV_PERSIST, OnWakeup, this); 306 EV_READ | EV_PERSIST, OnWakeup, this);
239 event_add(wakeup_event_.get(), 0); 307 event_add(wakeup_event_.get(), 0);
240 thread_.Start(); 308 thread_.Start();
241 } 309 }
242 310
243 TaskQueue::~TaskQueue() { 311 TaskQueueLibEvent::~TaskQueueLibEvent() {
244 RTC_DCHECK(!IsCurrent()); 312 RTC_DCHECK(!IsCurrent());
245 struct timespec ts; 313 struct timespec ts;
246 char message = kQuit; 314 char message = kQuit;
247 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { 315 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
248 // The queue is full, so we have no choice but to wait and retry. 316 // The queue is full, so we have no choice but to wait and retry.
249 RTC_CHECK_EQ(EAGAIN, errno); 317 RTC_CHECK_EQ(EAGAIN, errno);
250 ts.tv_sec = 0; 318 ts.tv_sec = 0;
251 ts.tv_nsec = 1000000; 319 ts.tv_nsec = 1000000;
252 nanosleep(&ts, nullptr); 320 nanosleep(&ts, nullptr);
253 } 321 }
254 322
255 thread_.Stop(); 323 thread_.Stop();
256 324
257 event_del(wakeup_event_.get()); 325 event_del(wakeup_event_.get());
258 326
259 IgnoreSigPipeSignalOnCurrentThread(); 327 IgnoreSigPipeSignalOnCurrentThread();
260 328
261 close(wakeup_pipe_in_); 329 close(wakeup_pipe_in_);
262 close(wakeup_pipe_out_); 330 close(wakeup_pipe_out_);
263 wakeup_pipe_in_ = -1; 331 wakeup_pipe_in_ = -1;
264 wakeup_pipe_out_ = -1; 332 wakeup_pipe_out_ = -1;
265 333
266 event_base_free(event_base_); 334 event_base_free(event_base_);
267 } 335 }
268 336
269 // static 337 // static
270 TaskQueue* TaskQueue::Current() { 338 TaskQueueLibEvent* TaskQueueLibEvent::Current() {
271 QueueContext* ctx = 339 QueueContext* ctx =
272 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 340 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
273 return ctx ? ctx->queue : nullptr; 341 return ctx ? ctx->queue : nullptr;
274 } 342 }
275 343
276 // static 344 // static
277 bool TaskQueue::IsCurrent(const char* queue_name) { 345 TaskQueue* TaskQueueLibEvent::CurrentQueue() {
278 TaskQueue* current = Current(); 346 TaskQueueLibEvent* current = Current();
347 if (current) {
348 return current->queue_;
349 }
350 return nullptr;
351 }
352
353 // static
354 bool TaskQueueLibEvent::IsCurrent(const char* queue_name) {
355 TaskQueueLibEvent* current = Current();
279 return current && current->thread_.name().compare(queue_name) == 0; 356 return current && current->thread_.name().compare(queue_name) == 0;
280 } 357 }
281 358
282 bool TaskQueue::IsCurrent() const { 359 bool TaskQueueLibEvent::IsCurrent() const {
283 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); 360 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
284 } 361 }
285 362
286 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { 363 void TaskQueueLibEvent::PostTask(std::unique_ptr<QueuedTask> task) {
287 RTC_DCHECK(task.get()); 364 RTC_DCHECK(task.get());
288 // libevent isn't thread safe. This means that we can't use methods such 365 // libevent isn't thread safe. This means that we can't use methods such
289 // as event_base_once to post tasks to the worker thread from a different 366 // as event_base_once to post tasks to the worker thread from a different
290 // thread. However, we can use it when posting from the worker thread itself. 367 // thread. However, we can use it when posting from the worker thread itself.
291 if (IsCurrent()) { 368 if (IsCurrent()) {
292 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, 369 if (event_base_once(event_base_, -1, EV_TIMEOUT,
293 task.get(), nullptr) == 0) { 370 &TaskQueueLibEvent::RunTask, task.get(),
371 nullptr) == 0) {
294 task.release(); 372 task.release();
295 } 373 }
296 } else { 374 } else {
297 QueuedTask* task_id = task.get(); // Only used for comparison. 375 QueuedTask* task_id = task.get(); // Only used for comparison.
298 { 376 {
299 CritScope lock(&pending_lock_); 377 CritScope lock(&pending_lock_);
300 pending_.push_back(std::move(task)); 378 pending_.push_back(std::move(task));
301 } 379 }
302 char message = kRunTask; 380 char message = kRunTask;
303 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { 381 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
304 LOG(WARNING) << "Failed to queue task."; 382 LOG(WARNING) << "Failed to queue task.";
305 CritScope lock(&pending_lock_); 383 CritScope lock(&pending_lock_);
306 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) { 384 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
307 return t.get() == task_id; 385 return t.get() == task_id;
308 }); 386 });
309 } 387 }
310 } 388 }
311 } 389 }
312 390
313 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, 391 void TaskQueueLibEvent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
314 uint32_t milliseconds) { 392 uint32_t milliseconds) {
315 if (IsCurrent()) { 393 if (IsCurrent()) {
316 TimerEvent* timer = new TimerEvent(std::move(task)); 394 TimerEvent* timer = new TimerEvent(std::move(task));
317 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); 395 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibEvent::RunTimer,
396 timer);
318 QueueContext* ctx = 397 QueueContext* ctx =
319 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 398 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
320 ctx->pending_timers_.push_back(timer); 399 ctx->pending_timers_.push_back(timer);
321 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000), 400 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
322 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000}; 401 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
323 event_add(&timer->ev, &tv); 402 event_add(&timer->ev, &tv);
324 } else { 403 } else {
325 PostTask(std::unique_ptr<QueuedTask>( 404 PostTask(std::unique_ptr<QueuedTask>(
326 new SetTimerTask(std::move(task), milliseconds))); 405 new SetTimerTask(std::move(task), milliseconds)));
327 } 406 }
328 } 407 }
329 408
330 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 409 void TaskQueueLibEvent::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
331 std::unique_ptr<QueuedTask> reply, 410 std::unique_ptr<QueuedTask> reply,
332 TaskQueue* reply_queue) { 411 TaskQueueImpl* reply_queue) {
333 std::unique_ptr<QueuedTask> wrapper_task( 412 TaskQueueLibEvent* task_queue_lib_event =
334 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, 413 static_cast<TaskQueueLibEvent*>(reply_queue);
335 reply_queue->wakeup_pipe_in_)); 414 std::unique_ptr<QueuedTask> wrapper_task(new PostAndReplyTask(
415 std::move(task), std::move(reply), task_queue_lib_event,
416 task_queue_lib_event->wakeup_pipe_in_));
336 PostTask(std::move(wrapper_task)); 417 PostTask(std::move(wrapper_task));
337 } 418 }
338 419
339 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
340 std::unique_ptr<QueuedTask> reply) {
341 return PostTaskAndReply(std::move(task), std::move(reply), Current());
342 }
343
344 // static 420 // static
345 void TaskQueue::ThreadMain(void* context) { 421 void TaskQueueLibEvent::ThreadMain(void* context) {
346 TaskQueue* me = static_cast<TaskQueue*>(context); 422 TaskQueueLibEvent* me = static_cast<TaskQueueLibEvent*>(context);
347 423
348 QueueContext queue_context(me); 424 QueueContext queue_context(me);
349 pthread_setspecific(GetQueuePtrTls(), &queue_context); 425 pthread_setspecific(GetQueuePtrTls(), &queue_context);
350 426
351 while (queue_context.is_active) 427 while (queue_context.is_active)
352 event_base_loop(me->event_base_, 0); 428 event_base_loop(me->event_base_, 0);
353 429
354 pthread_setspecific(GetQueuePtrTls(), nullptr); 430 pthread_setspecific(GetQueuePtrTls(), nullptr);
355 431
356 for (TimerEvent* timer : queue_context.pending_timers_) 432 for (TimerEvent* timer : queue_context.pending_timers_)
357 delete timer; 433 delete timer;
358 } 434 }
359 435
360 // static 436 // static
361 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT 437 void TaskQueueLibEvent::OnWakeup(int socket,
438 short flags,
439 void* context) { // NOLINT
362 QueueContext* ctx = 440 QueueContext* ctx =
363 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 441 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
364 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); 442 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
365 char buf; 443 char buf;
366 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); 444 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
367 switch (buf) { 445 switch (buf) {
368 case kQuit: 446 case kQuit:
369 ctx->is_active = false; 447 ctx->is_active = false;
370 event_base_loopbreak(ctx->queue->event_base_); 448 event_base_loopbreak(ctx->queue->event_base_);
371 break; 449 break;
(...skipping 26 matching lines...) Expand all
398 reply_task->Run(); 476 reply_task->Run();
399 break; 477 break;
400 } 478 }
401 default: 479 default:
402 RTC_NOTREACHED(); 480 RTC_NOTREACHED();
403 break; 481 break;
404 } 482 }
405 } 483 }
406 484
407 // static 485 // static
408 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT 486 void TaskQueueLibEvent::RunTask(int fd, short flags, void* context) { // NOLINT
409 auto* task = static_cast<QueuedTask*>(context); 487 auto* task = static_cast<QueuedTask*>(context);
410 if (task->Run()) 488 if (task->Run())
411 delete task; 489 delete task;
412 } 490 }
413 491
414 // static 492 // static
415 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT 493 void TaskQueueLibEvent::RunTimer(int fd,
494 short flags,
495 void* context) { // NOLINT
416 TimerEvent* timer = static_cast<TimerEvent*>(context); 496 TimerEvent* timer = static_cast<TimerEvent*>(context);
417 if (!timer->task->Run()) 497 if (!timer->task->Run())
418 timer->task.release(); 498 timer->task.release();
419 QueueContext* ctx = 499 QueueContext* ctx =
420 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 500 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
421 ctx->pending_timers_.remove(timer); 501 ctx->pending_timers_.remove(timer);
422 delete timer; 502 delete timer;
423 } 503 }
424 504
425 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { 505 void TaskQueueLibEvent::PrepareReplyTask(
506 scoped_refptr<ReplyTaskOwnerRef> reply_task) {
426 RTC_DCHECK(reply_task); 507 RTC_DCHECK(reply_task);
427 CritScope lock(&pending_lock_); 508 CritScope lock(&pending_lock_);
428 pending_replies_.push_back(std::move(reply_task)); 509 pending_replies_.push_back(std::move(reply_task));
429 } 510 }
430 511
512 TaskQueue::TaskQueue(const char* queue_name, Priority priority)
513 : impl_(
514 TaskQueueImplFactory::Get()->CreateImpl(queue_name, this, priority)) {
515 }
516
517 TaskQueue::~TaskQueue() {}
518
519 // static
520 TaskQueue* TaskQueue::Current() {
521 return TaskQueueImplFactory::Get()->CurrentQueue();
522 }
523
524 // static
525 // TODO(perkj): ! Now - should check |queue_name|.
526 bool TaskQueue::IsCurrent(const char* queue_name) {
527 TaskQueue* queue = TaskQueueImplFactory::Get()->CurrentQueue();
528 return queue ? queue->IsCurrent() : false;
529 }
530
531 bool TaskQueue::IsCurrent() const {
532 return impl_->IsCurrent();
533 }
534
535 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
536 return impl_->PostTask(std::move(task));
537 }
538
539 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
540 std::unique_ptr<QueuedTask> reply,
541 TaskQueue* reply_queue) {
542 return impl_->PostTaskAndReply(std::move(task), std::move(reply),
543 reply_queue->impl_.get());
544 }
545
546 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
547 std::unique_ptr<QueuedTask> reply) {
548 return impl_->PostTaskAndReply(std::move(task), std::move(reply),
549 impl_.get());
550 }
551
552 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
553 uint32_t milliseconds) {
554 return impl_->PostDelayedTask(std::move(task), milliseconds);
555 }
556
431 } // namespace rtc 557 } // namespace rtc
OLDNEW
« webrtc/rtc_base/task_queue_impl_factory.cc ('K') | « webrtc/rtc_base/task_queue_impl_factory.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698