| Index: webrtc/rtc_base/task_queue_libevent.cc
|
| diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc
|
| index db267dc80be893aa7715f5dea97ed2a57a93f47c..2f07d1402d8aa6f9546be2b60f00571f19a10e90 100644
|
| --- a/webrtc/rtc_base/task_queue_libevent.cc
|
| +++ b/webrtc/rtc_base/task_queue_libevent.cc
|
| @@ -18,7 +18,12 @@
|
| #include "base/third_party/libevent/event.h"
|
| #include "webrtc/rtc_base/checks.h"
|
| #include "webrtc/rtc_base/logging.h"
|
| +#include "webrtc/rtc_base/platform_thread.h"
|
| +
|
| +#include "webrtc/rtc_base/refcountedobject.h"
|
| #include "webrtc/rtc_base/safe_conversions.h"
|
| +#include "webrtc/rtc_base/task_queue_impl.h"
|
| +#include "webrtc/rtc_base/task_queue_impl_factory.h"
|
| #include "webrtc/rtc_base/task_queue_posix.h"
|
| #include "webrtc/rtc_base/timeutils.h"
|
|
|
| @@ -104,9 +109,69 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
|
| }
|
| } // namespace
|
|
|
| -struct TaskQueue::QueueContext {
|
| - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
|
| - TaskQueue* queue;
|
| +class TaskQueueLibEvent : public TaskQueueImpl {
|
| + public:
|
| + explicit TaskQueueLibEvent(const char* queue_name,
|
| + TaskQueue* queue,
|
| + Priority priority);
|
| + ~TaskQueueLibEvent() override;
|
| +
|
| + static TaskQueueLibEvent* Current();
|
| + static TaskQueue* CurrentQueue();
|
| +
|
| + // Used for DCHECKing the current queue.
|
| + static bool IsCurrent(const char* queue_name);
|
| + bool IsCurrent() const override;
|
| +
|
| + void PostTask(std::unique_ptr<QueuedTask> task) override;
|
| + void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply,
|
| + TaskQueueImpl* reply_queue) override;
|
| +
|
| + void PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| + uint32_t milliseconds) override;
|
| +
|
| + private:
|
| + static void ThreadMain(void* context);
|
| + static void OnWakeup(int socket, short flags, void* context); // NOLINT
|
| + static void RunTask(int fd, short flags, void* context); // NOLINT
|
| + static void RunTimer(int fd, short flags, void* context); // NOLINT
|
| +
|
| + class ReplyTaskOwner;
|
| + class PostAndReplyTask;
|
| + class SetTimerTask;
|
| +
|
| + typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
|
| +
|
| + void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
|
| +
|
| + struct QueueContext;
|
| + TaskQueue* const queue_;
|
| + int wakeup_pipe_in_ = -1;
|
| + int wakeup_pipe_out_ = -1;
|
| + event_base* event_base_;
|
| + std::unique_ptr<event> wakeup_event_;
|
| + PlatformThread thread_;
|
| + rtc::CriticalSection pending_lock_;
|
| + std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
|
| + std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
|
| + GUARDED_BY(pending_lock_);
|
| +};
|
| +
|
| +scoped_refptr<TaskQueueImpl> TaskQueueLibEventFactory::CreateImpl(
|
| + const char* queue_name,
|
| + TaskQueue* queue, // only used for GetCurrent()
|
| + TaskQueue::Priority priority) {
|
| + return new RefCountedObject<TaskQueueLibEvent>(queue_name, queue, priority);
|
| +}
|
| +
|
| +TaskQueue* TaskQueueLibEventFactory::CurrentQueue() {
|
| + return TaskQueueLibEvent::CurrentQueue();
|
| +}
|
| +
|
| +struct TaskQueueLibEvent::QueueContext {
|
| + explicit QueueContext(TaskQueueLibEvent* q) : queue(q), is_active(true) {}
|
| + TaskQueueLibEvent* queue;
|
| bool is_active;
|
| // Holds a list of events pending timers for cleanup when the loop exits.
|
| std::list<TimerEvent*> pending_timers_;
|
| @@ -135,7 +200,7 @@ struct TaskQueue::QueueContext {
|
| // * if set_should_run_task() was called, the reply task will be run
|
| // * Release the reference to ReplyTaskOwner
|
| // * ReplyTaskOwner and associated |reply_| are deleted.
|
| -class TaskQueue::ReplyTaskOwner {
|
| +class TaskQueueLibEvent::ReplyTaskOwner {
|
| public:
|
| ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
|
| : reply_(std::move(reply)) {}
|
| @@ -159,11 +224,11 @@ class TaskQueue::ReplyTaskOwner {
|
| bool run_task_ = false;
|
| };
|
|
|
| -class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| +class TaskQueueLibEvent::PostAndReplyTask : public QueuedTask {
|
| public:
|
| PostAndReplyTask(std::unique_ptr<QueuedTask> task,
|
| std::unique_ptr<QueuedTask> reply,
|
| - TaskQueue* reply_queue,
|
| + TaskQueueLibEvent* reply_queue,
|
| int reply_pipe)
|
| : task_(std::move(task)),
|
| reply_pipe_(reply_pipe),
|
| @@ -196,7 +261,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
|
| };
|
|
|
| -class TaskQueue::SetTimerTask : public QueuedTask {
|
| +class TaskQueueLibEvent::SetTimerTask : public QueuedTask {
|
| public:
|
| SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
|
| : task_(std::move(task)),
|
| @@ -208,7 +273,7 @@ class TaskQueue::SetTimerTask : public QueuedTask {
|
| // Compensate for the time that has passed since construction
|
| // and until we got here.
|
| uint32_t post_time = Time32() - posted_;
|
| - TaskQueue::Current()->PostDelayedTask(
|
| + TaskQueueLibEvent::Current()->PostDelayedTask(
|
| std::move(task_),
|
| post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
|
| return true;
|
| @@ -219,10 +284,13 @@ class TaskQueue::SetTimerTask : public QueuedTask {
|
| const uint32_t posted_;
|
| };
|
|
|
| -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
|
| - : event_base_(event_base_new()),
|
| +TaskQueueLibEvent::TaskQueueLibEvent(const char* queue_name,
|
| + TaskQueue* queue,
|
| + Priority priority /*= NORMAL*/)
|
| + : queue_(queue),
|
| + event_base_(event_base_new()),
|
| wakeup_event_(new event()),
|
| - thread_(&TaskQueue::ThreadMain,
|
| + thread_(&TaskQueueLibEvent::ThreadMain,
|
| this,
|
| queue_name,
|
| TaskQueuePriorityToThreadPriority(priority)) {
|
| @@ -240,7 +308,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
|
| thread_.Start();
|
| }
|
|
|
| -TaskQueue::~TaskQueue() {
|
| +TaskQueueLibEvent::~TaskQueueLibEvent() {
|
| RTC_DCHECK(!IsCurrent());
|
| struct timespec ts;
|
| char message = kQuit;
|
| @@ -267,30 +335,40 @@ TaskQueue::~TaskQueue() {
|
| }
|
|
|
| // static
|
| -TaskQueue* TaskQueue::Current() {
|
| +TaskQueueLibEvent* TaskQueueLibEvent::Current() {
|
| QueueContext* ctx =
|
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
| return ctx ? ctx->queue : nullptr;
|
| }
|
|
|
| // static
|
| -bool TaskQueue::IsCurrent(const char* queue_name) {
|
| - TaskQueue* current = Current();
|
| +TaskQueue* TaskQueueLibEvent::CurrentQueue() {
|
| + TaskQueueLibEvent* current = Current();
|
| + if (current) {
|
| + return current->queue_;
|
| + }
|
| + return nullptr;
|
| +}
|
| +
|
| +// static
|
| +bool TaskQueueLibEvent::IsCurrent(const char* queue_name) {
|
| + TaskQueueLibEvent* current = Current();
|
| return current && current->thread_.name().compare(queue_name) == 0;
|
| }
|
|
|
| -bool TaskQueue::IsCurrent() const {
|
| +bool TaskQueueLibEvent::IsCurrent() const {
|
| return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
|
| }
|
|
|
| -void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
| +void TaskQueueLibEvent::PostTask(std::unique_ptr<QueuedTask> task) {
|
| RTC_DCHECK(task.get());
|
| // libevent isn't thread safe. This means that we can't use methods such
|
| // as event_base_once to post tasks to the worker thread from a different
|
| // thread. However, we can use it when posting from the worker thread itself.
|
| if (IsCurrent()) {
|
| - if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
|
| - task.get(), nullptr) == 0) {
|
| + if (event_base_once(event_base_, -1, EV_TIMEOUT,
|
| + &TaskQueueLibEvent::RunTask, task.get(),
|
| + nullptr) == 0) {
|
| task.release();
|
| }
|
| } else {
|
| @@ -310,11 +388,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
| }
|
| }
|
|
|
| -void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| - uint32_t milliseconds) {
|
| +void TaskQueueLibEvent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| + uint32_t milliseconds) {
|
| if (IsCurrent()) {
|
| TimerEvent* timer = new TimerEvent(std::move(task));
|
| - EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
|
| + EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibEvent::RunTimer,
|
| + timer);
|
| QueueContext* ctx =
|
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
| ctx->pending_timers_.push_back(timer);
|
| @@ -327,23 +406,20 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| }
|
| }
|
|
|
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| - std::unique_ptr<QueuedTask> reply,
|
| - TaskQueue* reply_queue) {
|
| - std::unique_ptr<QueuedTask> wrapper_task(
|
| - new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
|
| - reply_queue->wakeup_pipe_in_));
|
| +void TaskQueueLibEvent::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply,
|
| + TaskQueueImpl* reply_queue) {
|
| + TaskQueueLibEvent* task_queue_lib_event =
|
| + static_cast<TaskQueueLibEvent*>(reply_queue);
|
| + std::unique_ptr<QueuedTask> wrapper_task(new PostAndReplyTask(
|
| + std::move(task), std::move(reply), task_queue_lib_event,
|
| + task_queue_lib_event->wakeup_pipe_in_));
|
| PostTask(std::move(wrapper_task));
|
| }
|
|
|
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| - std::unique_ptr<QueuedTask> reply) {
|
| - return PostTaskAndReply(std::move(task), std::move(reply), Current());
|
| -}
|
| -
|
| // static
|
| -void TaskQueue::ThreadMain(void* context) {
|
| - TaskQueue* me = static_cast<TaskQueue*>(context);
|
| +void TaskQueueLibEvent::ThreadMain(void* context) {
|
| + TaskQueueLibEvent* me = static_cast<TaskQueueLibEvent*>(context);
|
|
|
| QueueContext queue_context(me);
|
| pthread_setspecific(GetQueuePtrTls(), &queue_context);
|
| @@ -358,7 +434,9 @@ void TaskQueue::ThreadMain(void* context) {
|
| }
|
|
|
| // static
|
| -void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
|
| +void TaskQueueLibEvent::OnWakeup(int socket,
|
| + short flags,
|
| + void* context) { // NOLINT
|
| QueueContext* ctx =
|
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
| RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
|
| @@ -405,14 +483,16 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
|
| }
|
|
|
| // static
|
| -void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
|
| +void TaskQueueLibEvent::RunTask(int fd, short flags, void* context) { // NOLINT
|
| auto* task = static_cast<QueuedTask*>(context);
|
| if (task->Run())
|
| delete task;
|
| }
|
|
|
| // static
|
| -void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
|
| +void TaskQueueLibEvent::RunTimer(int fd,
|
| + short flags,
|
| + void* context) { // NOLINT
|
| TimerEvent* timer = static_cast<TimerEvent*>(context);
|
| if (!timer->task->Run())
|
| timer->task.release();
|
| @@ -422,10 +502,56 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
|
| delete timer;
|
| }
|
|
|
| -void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
|
| +void TaskQueueLibEvent::PrepareReplyTask(
|
| + scoped_refptr<ReplyTaskOwnerRef> reply_task) {
|
| RTC_DCHECK(reply_task);
|
| CritScope lock(&pending_lock_);
|
| pending_replies_.push_back(std::move(reply_task));
|
| }
|
|
|
| +TaskQueue::TaskQueue(const char* queue_name, Priority priority)
|
| + : impl_(
|
| + TaskQueueImplFactory::Get()->CreateImpl(queue_name, this, priority)) {
|
| +}
|
| +
|
| +TaskQueue::~TaskQueue() {}
|
| +
|
| +// static
|
| +TaskQueue* TaskQueue::Current() {
|
| + return TaskQueueImplFactory::Get()->CurrentQueue();
|
| +}
|
| +
|
| +// static
|
| +// TODO(perkj): ! Now - should check |queue_name|.
|
| +bool TaskQueue::IsCurrent(const char* queue_name) {
|
| + TaskQueue* queue = TaskQueueImplFactory::Get()->CurrentQueue();
|
| + return queue ? queue->IsCurrent() : false;
|
| +}
|
| +
|
| +bool TaskQueue::IsCurrent() const {
|
| + return impl_->IsCurrent();
|
| +}
|
| +
|
| +void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
| + return impl_->PostTask(std::move(task));
|
| +}
|
| +
|
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply,
|
| + TaskQueue* reply_queue) {
|
| + return impl_->PostTaskAndReply(std::move(task), std::move(reply),
|
| + reply_queue->impl_.get());
|
| +}
|
| +
|
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply) {
|
| + return impl_->PostTaskAndReply(std::move(task), std::move(reply),
|
| + impl_.get());
|
| +}
|
| +
|
| +void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| + uint32_t milliseconds) {
|
| + return impl_->PostDelayedTask(std::move(task), milliseconds);
|
| +}
|
| +
|
| } // namespace rtc
|
|
|