Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(864)

Unified Diff: webrtc/rtc_base/task_queue_libevent.cc

Issue 2936213003: Test using a global, replacable TaskQueueImpl factory.
Patch Set: Added global factory. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/rtc_base/task_queue_libevent.cc
diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc
index db267dc80be893aa7715f5dea97ed2a57a93f47c..2f07d1402d8aa6f9546be2b60f00571f19a10e90 100644
--- a/webrtc/rtc_base/task_queue_libevent.cc
+++ b/webrtc/rtc_base/task_queue_libevent.cc
@@ -18,7 +18,12 @@
#include "base/third_party/libevent/event.h"
#include "webrtc/rtc_base/checks.h"
#include "webrtc/rtc_base/logging.h"
+#include "webrtc/rtc_base/platform_thread.h"
+
+#include "webrtc/rtc_base/refcountedobject.h"
#include "webrtc/rtc_base/safe_conversions.h"
+#include "webrtc/rtc_base/task_queue_impl.h"
+#include "webrtc/rtc_base/task_queue_impl_factory.h"
#include "webrtc/rtc_base/task_queue_posix.h"
#include "webrtc/rtc_base/timeutils.h"
@@ -104,9 +109,69 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
}
} // namespace
-struct TaskQueue::QueueContext {
- explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
- TaskQueue* queue;
+class TaskQueueLibEvent : public TaskQueueImpl {
+ public:
+ explicit TaskQueueLibEvent(const char* queue_name,
+ TaskQueue* queue,
+ Priority priority);
+ ~TaskQueueLibEvent() override;
+
+ static TaskQueueLibEvent* Current();
+ static TaskQueue* CurrentQueue();
+
+ // Used for DCHECKing the current queue.
+ static bool IsCurrent(const char* queue_name);
+ bool IsCurrent() const override;
+
+ void PostTask(std::unique_ptr<QueuedTask> task) override;
+ void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueueImpl* reply_queue) override;
+
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) override;
+
+ private:
+ static void ThreadMain(void* context);
+ static void OnWakeup(int socket, short flags, void* context); // NOLINT
+ static void RunTask(int fd, short flags, void* context); // NOLINT
+ static void RunTimer(int fd, short flags, void* context); // NOLINT
+
+ class ReplyTaskOwner;
+ class PostAndReplyTask;
+ class SetTimerTask;
+
+ typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
+
+ void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
+
+ struct QueueContext;
+ TaskQueue* const queue_;
+ int wakeup_pipe_in_ = -1;
+ int wakeup_pipe_out_ = -1;
+ event_base* event_base_;
+ std::unique_ptr<event> wakeup_event_;
+ PlatformThread thread_;
+ rtc::CriticalSection pending_lock_;
+ std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
+ std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
+ GUARDED_BY(pending_lock_);
+};
+
+scoped_refptr<TaskQueueImpl> TaskQueueLibEventFactory::CreateImpl(
+ const char* queue_name,
+ TaskQueue* queue, // only used for GetCurrent()
+ TaskQueue::Priority priority) {
+ return new RefCountedObject<TaskQueueLibEvent>(queue_name, queue, priority);
+}
+
+TaskQueue* TaskQueueLibEventFactory::CurrentQueue() {
+ return TaskQueueLibEvent::CurrentQueue();
+}
+
+struct TaskQueueLibEvent::QueueContext {
+ explicit QueueContext(TaskQueueLibEvent* q) : queue(q), is_active(true) {}
+ TaskQueueLibEvent* queue;
bool is_active;
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
@@ -135,7 +200,7 @@ struct TaskQueue::QueueContext {
// * if set_should_run_task() was called, the reply task will be run
// * Release the reference to ReplyTaskOwner
// * ReplyTaskOwner and associated |reply_| are deleted.
-class TaskQueue::ReplyTaskOwner {
+class TaskQueueLibEvent::ReplyTaskOwner {
public:
ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
: reply_(std::move(reply)) {}
@@ -159,11 +224,11 @@ class TaskQueue::ReplyTaskOwner {
bool run_task_ = false;
};
-class TaskQueue::PostAndReplyTask : public QueuedTask {
+class TaskQueueLibEvent::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue,
+ TaskQueueLibEvent* reply_queue,
int reply_pipe)
: task_(std::move(task)),
reply_pipe_(reply_pipe),
@@ -196,7 +261,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask {
scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
};
-class TaskQueue::SetTimerTask : public QueuedTask {
+class TaskQueueLibEvent::SetTimerTask : public QueuedTask {
public:
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
: task_(std::move(task)),
@@ -208,7 +273,7 @@ class TaskQueue::SetTimerTask : public QueuedTask {
// Compensate for the time that has passed since construction
// and until we got here.
uint32_t post_time = Time32() - posted_;
- TaskQueue::Current()->PostDelayedTask(
+ TaskQueueLibEvent::Current()->PostDelayedTask(
std::move(task_),
post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
return true;
@@ -219,10 +284,13 @@ class TaskQueue::SetTimerTask : public QueuedTask {
const uint32_t posted_;
};
-TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
- : event_base_(event_base_new()),
+TaskQueueLibEvent::TaskQueueLibEvent(const char* queue_name,
+ TaskQueue* queue,
+ Priority priority /*= NORMAL*/)
+ : queue_(queue),
+ event_base_(event_base_new()),
wakeup_event_(new event()),
- thread_(&TaskQueue::ThreadMain,
+ thread_(&TaskQueueLibEvent::ThreadMain,
this,
queue_name,
TaskQueuePriorityToThreadPriority(priority)) {
@@ -240,7 +308,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
thread_.Start();
}
-TaskQueue::~TaskQueue() {
+TaskQueueLibEvent::~TaskQueueLibEvent() {
RTC_DCHECK(!IsCurrent());
struct timespec ts;
char message = kQuit;
@@ -267,30 +335,40 @@ TaskQueue::~TaskQueue() {
}
// static
-TaskQueue* TaskQueue::Current() {
+TaskQueueLibEvent* TaskQueueLibEvent::Current() {
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
return ctx ? ctx->queue : nullptr;
}
// static
-bool TaskQueue::IsCurrent(const char* queue_name) {
- TaskQueue* current = Current();
+TaskQueue* TaskQueueLibEvent::CurrentQueue() {
+ TaskQueueLibEvent* current = Current();
+ if (current) {
+ return current->queue_;
+ }
+ return nullptr;
+}
+
+// static
+bool TaskQueueLibEvent::IsCurrent(const char* queue_name) {
+ TaskQueueLibEvent* current = Current();
return current && current->thread_.name().compare(queue_name) == 0;
}
-bool TaskQueue::IsCurrent() const {
+bool TaskQueueLibEvent::IsCurrent() const {
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
}
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueLibEvent::PostTask(std::unique_ptr<QueuedTask> task) {
RTC_DCHECK(task.get());
// libevent isn't thread safe. This means that we can't use methods such
// as event_base_once to post tasks to the worker thread from a different
// thread. However, we can use it when posting from the worker thread itself.
if (IsCurrent()) {
- if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
- task.get(), nullptr) == 0) {
+ if (event_base_once(event_base_, -1, EV_TIMEOUT,
+ &TaskQueueLibEvent::RunTask, task.get(),
+ nullptr) == 0) {
task.release();
}
} else {
@@ -310,11 +388,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
}
}
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
+void TaskQueueLibEvent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
if (IsCurrent()) {
TimerEvent* timer = new TimerEvent(std::move(task));
- EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
+ EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibEvent::RunTimer,
+ timer);
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
ctx->pending_timers_.push_back(timer);
@@ -327,23 +406,20 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
}
}
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue) {
- std::unique_ptr<QueuedTask> wrapper_task(
- new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
- reply_queue->wakeup_pipe_in_));
+void TaskQueueLibEvent::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueueImpl* reply_queue) {
+ TaskQueueLibEvent* task_queue_lib_event =
+ static_cast<TaskQueueLibEvent*>(reply_queue);
+ std::unique_ptr<QueuedTask> wrapper_task(new PostAndReplyTask(
+ std::move(task), std::move(reply), task_queue_lib_event,
+ task_queue_lib_event->wakeup_pipe_in_));
PostTask(std::move(wrapper_task));
}
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply) {
- return PostTaskAndReply(std::move(task), std::move(reply), Current());
-}
-
// static
-void TaskQueue::ThreadMain(void* context) {
- TaskQueue* me = static_cast<TaskQueue*>(context);
+void TaskQueueLibEvent::ThreadMain(void* context) {
+ TaskQueueLibEvent* me = static_cast<TaskQueueLibEvent*>(context);
QueueContext queue_context(me);
pthread_setspecific(GetQueuePtrTls(), &queue_context);
@@ -358,7 +434,9 @@ void TaskQueue::ThreadMain(void* context) {
}
// static
-void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
+void TaskQueueLibEvent::OnWakeup(int socket,
+ short flags,
+ void* context) { // NOLINT
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
@@ -405,14 +483,16 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
}
// static
-void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
+void TaskQueueLibEvent::RunTask(int fd, short flags, void* context) { // NOLINT
auto* task = static_cast<QueuedTask*>(context);
if (task->Run())
delete task;
}
// static
-void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
+void TaskQueueLibEvent::RunTimer(int fd,
+ short flags,
+ void* context) { // NOLINT
TimerEvent* timer = static_cast<TimerEvent*>(context);
if (!timer->task->Run())
timer->task.release();
@@ -422,10 +502,56 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
delete timer;
}
-void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
+void TaskQueueLibEvent::PrepareReplyTask(
+ scoped_refptr<ReplyTaskOwnerRef> reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
pending_replies_.push_back(std::move(reply_task));
}
+TaskQueue::TaskQueue(const char* queue_name, Priority priority)
+ : impl_(
+ TaskQueueImplFactory::Get()->CreateImpl(queue_name, this, priority)) {
+}
+
+TaskQueue::~TaskQueue() {}
+
+// static
+TaskQueue* TaskQueue::Current() {
+ return TaskQueueImplFactory::Get()->CurrentQueue();
+}
+
+// static
+// TODO(perkj): ! Now - should check |queue_name|.
+bool TaskQueue::IsCurrent(const char* queue_name) {
+ TaskQueue* queue = TaskQueueImplFactory::Get()->CurrentQueue();
+ return queue ? queue->IsCurrent() : false;
+}
+
+bool TaskQueue::IsCurrent() const {
+ return impl_->IsCurrent();
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+ return impl_->PostTask(std::move(task));
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue* reply_queue) {
+ return impl_->PostTaskAndReply(std::move(task), std::move(reply),
+ reply_queue->impl_.get());
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply) {
+ return impl_->PostTaskAndReply(std::move(task), std::move(reply),
+ impl_.get());
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
+ return impl_->PostDelayedTask(std::move(task), milliseconds);
+}
+
} // namespace rtc
« webrtc/rtc_base/task_queue_impl_factory.cc ('K') | « webrtc/rtc_base/task_queue_impl_factory.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698