Index: webrtc/rtc_base/task_queue_libevent.cc |
diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc |
index db267dc80be893aa7715f5dea97ed2a57a93f47c..2f07d1402d8aa6f9546be2b60f00571f19a10e90 100644 |
--- a/webrtc/rtc_base/task_queue_libevent.cc |
+++ b/webrtc/rtc_base/task_queue_libevent.cc |
@@ -18,7 +18,12 @@ |
#include "base/third_party/libevent/event.h" |
#include "webrtc/rtc_base/checks.h" |
#include "webrtc/rtc_base/logging.h" |
+#include "webrtc/rtc_base/platform_thread.h" |
+ |
+#include "webrtc/rtc_base/refcountedobject.h" |
#include "webrtc/rtc_base/safe_conversions.h" |
+#include "webrtc/rtc_base/task_queue_impl.h" |
+#include "webrtc/rtc_base/task_queue_impl_factory.h" |
#include "webrtc/rtc_base/task_queue_posix.h" |
#include "webrtc/rtc_base/timeutils.h" |
@@ -104,9 +109,69 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { |
} |
} // namespace |
-struct TaskQueue::QueueContext { |
- explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
- TaskQueue* queue; |
+class TaskQueueLibEvent : public TaskQueueImpl { |
+ public: |
+ explicit TaskQueueLibEvent(const char* queue_name, |
+ TaskQueue* queue, |
+ Priority priority); |
+ ~TaskQueueLibEvent() override; |
+ |
+ static TaskQueueLibEvent* Current(); |
+ static TaskQueue* CurrentQueue(); |
+ |
+ // Used for DCHECKing the current queue. |
+ static bool IsCurrent(const char* queue_name); |
+ bool IsCurrent() const override; |
+ |
+ void PostTask(std::unique_ptr<QueuedTask> task) override; |
+ void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueueImpl* reply_queue) override; |
+ |
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task, |
+ uint32_t milliseconds) override; |
+ |
+ private: |
+ static void ThreadMain(void* context); |
+ static void OnWakeup(int socket, short flags, void* context); // NOLINT |
+ static void RunTask(int fd, short flags, void* context); // NOLINT |
+ static void RunTimer(int fd, short flags, void* context); // NOLINT |
+ |
+ class ReplyTaskOwner; |
+ class PostAndReplyTask; |
+ class SetTimerTask; |
+ |
+ typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef; |
+ |
+ void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task); |
+ |
+ struct QueueContext; |
+ TaskQueue* const queue_; |
+ int wakeup_pipe_in_ = -1; |
+ int wakeup_pipe_out_ = -1; |
+ event_base* event_base_; |
+ std::unique_ptr<event> wakeup_event_; |
+ PlatformThread thread_; |
+ rtc::CriticalSection pending_lock_; |
+ std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); |
+ std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_ |
+ GUARDED_BY(pending_lock_); |
+}; |
+ |
+scoped_refptr<TaskQueueImpl> TaskQueueLibEventFactory::CreateImpl( |
+ const char* queue_name, |
+ TaskQueue* queue, // only used for GetCurrent() |
+ TaskQueue::Priority priority) { |
+ return new RefCountedObject<TaskQueueLibEvent>(queue_name, queue, priority); |
+} |
+ |
+TaskQueue* TaskQueueLibEventFactory::CurrentQueue() { |
+ return TaskQueueLibEvent::CurrentQueue(); |
+} |
+ |
+struct TaskQueueLibEvent::QueueContext { |
+ explicit QueueContext(TaskQueueLibEvent* q) : queue(q), is_active(true) {} |
+ TaskQueueLibEvent* queue; |
bool is_active; |
// Holds a list of events pending timers for cleanup when the loop exits. |
std::list<TimerEvent*> pending_timers_; |
@@ -135,7 +200,7 @@ struct TaskQueue::QueueContext { |
// * if set_should_run_task() was called, the reply task will be run |
// * Release the reference to ReplyTaskOwner |
// * ReplyTaskOwner and associated |reply_| are deleted. |
-class TaskQueue::ReplyTaskOwner { |
+class TaskQueueLibEvent::ReplyTaskOwner { |
public: |
ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) |
: reply_(std::move(reply)) {} |
@@ -159,11 +224,11 @@ class TaskQueue::ReplyTaskOwner { |
bool run_task_ = false; |
}; |
-class TaskQueue::PostAndReplyTask : public QueuedTask { |
+class TaskQueueLibEvent::PostAndReplyTask : public QueuedTask { |
public: |
PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
std::unique_ptr<QueuedTask> reply, |
- TaskQueue* reply_queue, |
+ TaskQueueLibEvent* reply_queue, |
int reply_pipe) |
: task_(std::move(task)), |
reply_pipe_(reply_pipe), |
@@ -196,7 +261,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { |
scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; |
}; |
-class TaskQueue::SetTimerTask : public QueuedTask { |
+class TaskQueueLibEvent::SetTimerTask : public QueuedTask { |
public: |
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) |
: task_(std::move(task)), |
@@ -208,7 +273,7 @@ class TaskQueue::SetTimerTask : public QueuedTask { |
// Compensate for the time that has passed since construction |
// and until we got here. |
uint32_t post_time = Time32() - posted_; |
- TaskQueue::Current()->PostDelayedTask( |
+ TaskQueueLibEvent::Current()->PostDelayedTask( |
std::move(task_), |
post_time > milliseconds_ ? 0 : milliseconds_ - post_time); |
return true; |
@@ -219,10 +284,13 @@ class TaskQueue::SetTimerTask : public QueuedTask { |
const uint32_t posted_; |
}; |
-TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
- : event_base_(event_base_new()), |
+TaskQueueLibEvent::TaskQueueLibEvent(const char* queue_name, |
+ TaskQueue* queue, |
+ Priority priority /*= NORMAL*/) |
+ : queue_(queue), |
+ event_base_(event_base_new()), |
wakeup_event_(new event()), |
- thread_(&TaskQueue::ThreadMain, |
+ thread_(&TaskQueueLibEvent::ThreadMain, |
this, |
queue_name, |
TaskQueuePriorityToThreadPriority(priority)) { |
@@ -240,7 +308,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
thread_.Start(); |
} |
-TaskQueue::~TaskQueue() { |
+TaskQueueLibEvent::~TaskQueueLibEvent() { |
RTC_DCHECK(!IsCurrent()); |
struct timespec ts; |
char message = kQuit; |
@@ -267,30 +335,40 @@ TaskQueue::~TaskQueue() { |
} |
// static |
-TaskQueue* TaskQueue::Current() { |
+TaskQueueLibEvent* TaskQueueLibEvent::Current() { |
QueueContext* ctx = |
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
return ctx ? ctx->queue : nullptr; |
} |
// static |
-bool TaskQueue::IsCurrent(const char* queue_name) { |
- TaskQueue* current = Current(); |
+TaskQueue* TaskQueueLibEvent::CurrentQueue() { |
+ TaskQueueLibEvent* current = Current(); |
+ if (current) { |
+ return current->queue_; |
+ } |
+ return nullptr; |
+} |
+ |
+// static |
+bool TaskQueueLibEvent::IsCurrent(const char* queue_name) { |
+ TaskQueueLibEvent* current = Current(); |
return current && current->thread_.name().compare(queue_name) == 0; |
} |
-bool TaskQueue::IsCurrent() const { |
+bool TaskQueueLibEvent::IsCurrent() const { |
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
} |
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
+void TaskQueueLibEvent::PostTask(std::unique_ptr<QueuedTask> task) { |
RTC_DCHECK(task.get()); |
// libevent isn't thread safe. This means that we can't use methods such |
// as event_base_once to post tasks to the worker thread from a different |
// thread. However, we can use it when posting from the worker thread itself. |
if (IsCurrent()) { |
- if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, |
- task.get(), nullptr) == 0) { |
+ if (event_base_once(event_base_, -1, EV_TIMEOUT, |
+ &TaskQueueLibEvent::RunTask, task.get(), |
+ nullptr) == 0) { |
task.release(); |
} |
} else { |
@@ -310,11 +388,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
} |
} |
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
- uint32_t milliseconds) { |
+void TaskQueueLibEvent::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
+ uint32_t milliseconds) { |
if (IsCurrent()) { |
TimerEvent* timer = new TimerEvent(std::move(task)); |
- EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); |
+ EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibEvent::RunTimer, |
+ timer); |
QueueContext* ctx = |
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
ctx->pending_timers_.push_back(timer); |
@@ -327,23 +406,20 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
} |
} |
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
- std::unique_ptr<QueuedTask> reply, |
- TaskQueue* reply_queue) { |
- std::unique_ptr<QueuedTask> wrapper_task( |
- new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, |
- reply_queue->wakeup_pipe_in_)); |
+void TaskQueueLibEvent::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueueImpl* reply_queue) { |
+ TaskQueueLibEvent* task_queue_lib_event = |
+ static_cast<TaskQueueLibEvent*>(reply_queue); |
+ std::unique_ptr<QueuedTask> wrapper_task(new PostAndReplyTask( |
+ std::move(task), std::move(reply), task_queue_lib_event, |
+ task_queue_lib_event->wakeup_pipe_in_)); |
PostTask(std::move(wrapper_task)); |
} |
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
- std::unique_ptr<QueuedTask> reply) { |
- return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
-} |
- |
// static |
-void TaskQueue::ThreadMain(void* context) { |
- TaskQueue* me = static_cast<TaskQueue*>(context); |
+void TaskQueueLibEvent::ThreadMain(void* context) { |
+ TaskQueueLibEvent* me = static_cast<TaskQueueLibEvent*>(context); |
QueueContext queue_context(me); |
pthread_setspecific(GetQueuePtrTls(), &queue_context); |
@@ -358,7 +434,9 @@ void TaskQueue::ThreadMain(void* context) { |
} |
// static |
-void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
+void TaskQueueLibEvent::OnWakeup(int socket, |
+ short flags, |
+ void* context) { // NOLINT |
QueueContext* ctx = |
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); |
@@ -405,14 +483,16 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
} |
// static |
-void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT |
+void TaskQueueLibEvent::RunTask(int fd, short flags, void* context) { // NOLINT |
auto* task = static_cast<QueuedTask*>(context); |
if (task->Run()) |
delete task; |
} |
// static |
-void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
+void TaskQueueLibEvent::RunTimer(int fd, |
+ short flags, |
+ void* context) { // NOLINT |
TimerEvent* timer = static_cast<TimerEvent*>(context); |
if (!timer->task->Run()) |
timer->task.release(); |
@@ -422,10 +502,56 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
delete timer; |
} |
-void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
+void TaskQueueLibEvent::PrepareReplyTask( |
+ scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
RTC_DCHECK(reply_task); |
CritScope lock(&pending_lock_); |
pending_replies_.push_back(std::move(reply_task)); |
} |
+TaskQueue::TaskQueue(const char* queue_name, Priority priority) |
+ : impl_( |
+ TaskQueueImplFactory::Get()->CreateImpl(queue_name, this, priority)) { |
+} |
+ |
+TaskQueue::~TaskQueue() {} |
+ |
+// static |
+TaskQueue* TaskQueue::Current() { |
+ return TaskQueueImplFactory::Get()->CurrentQueue(); |
+} |
+ |
+// static |
+// TODO(perkj): ! Now - should check |queue_name|. |
+bool TaskQueue::IsCurrent(const char* queue_name) { |
+ TaskQueue* queue = TaskQueueImplFactory::Get()->CurrentQueue(); |
+ return queue ? queue->IsCurrent() : false; |
+} |
+ |
+bool TaskQueue::IsCurrent() const { |
+ return impl_->IsCurrent(); |
+} |
+ |
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
+ return impl_->PostTask(std::move(task)); |
+} |
+ |
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueue* reply_queue) { |
+ return impl_->PostTaskAndReply(std::move(task), std::move(reply), |
+ reply_queue->impl_.get()); |
+} |
+ |
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply) { |
+ return impl_->PostTaskAndReply(std::move(task), std::move(reply), |
+ impl_.get()); |
+} |
+ |
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
+ uint32_t milliseconds) { |
+ return impl_->PostDelayedTask(std::move(task), milliseconds); |
+} |
+ |
} // namespace rtc |