Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6282)

Unified Diff: chrome/common/net/notifier/listener/mediator_thread_impl.cc

Issue 2471006: Refactored MediatorThread to use Chrome threads primarily. (Closed)
Patch Set: Created 10 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chrome/common/net/notifier/listener/mediator_thread_impl.cc
diff --git a/chrome/common/net/notifier/listener/mediator_thread_impl.cc b/chrome/common/net/notifier/listener/mediator_thread_impl.cc
index f13dd258220fd4520292783e8466be1af7871363..ac88983c410c6965ba69f048fae892900cae949e 100644
--- a/chrome/common/net/notifier/listener/mediator_thread_impl.cc
+++ b/chrome/common/net/notifier/listener/mediator_thread_impl.cc
@@ -6,7 +6,7 @@
#include "base/logging.h"
#include "base/message_loop.h"
-#include "base/platform_thread.h"
+#include "base/task.h"
#include "chrome/common/net/network_change_notifier_proxy.h"
#include "chrome/common/net/notifier/base/task_pump.h"
#include "chrome/common/net/notifier/communicator/connection_options.h"
@@ -17,11 +17,17 @@
#include "chrome/common/net/notifier/listener/subscribe_task.h"
#include "net/base/host_port_pair.h"
#include "net/base/host_resolver.h"
+#include "talk/base/physicalsocketserver.h"
#include "talk/base/thread.h"
#include "talk/xmpp/xmppclient.h"
#include "talk/xmpp/xmppclientsettings.h"
-using std::string;
+// We manage the lifetime of notifier::MediatorThreadImpl ourselves.
+template <>
+struct RunnableMethodTraits<notifier::MediatorThreadImpl> {
+ void RetainCallee(notifier::MediatorThreadImpl*) {}
+ void ReleaseCallee(notifier::MediatorThreadImpl*) {}
+};
namespace notifier {
@@ -29,111 +35,123 @@ MediatorThreadImpl::MediatorThreadImpl(
chrome_common_net::NetworkChangeNotifierThread*
network_change_notifier_thread)
: delegate_(NULL),
- network_change_notifier_thread_(network_change_notifier_thread) {
+ parent_message_loop_(MessageLoop::current()),
+ network_change_notifier_thread_(network_change_notifier_thread),
+ worker_thread_("MediatorThread worker thread") {
+ DCHECK(parent_message_loop_);
DCHECK(network_change_notifier_thread_);
}
MediatorThreadImpl::~MediatorThreadImpl() {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
}
void MediatorThreadImpl::SetDelegate(Delegate* delegate) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
delegate_ = delegate;
}
void MediatorThreadImpl::Start() {
- talk_base::Thread::Start();
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ // We create the worker thread as an IO thread in preparation for
+ // making this use Chrome sockets.
+ const base::Thread::Options options(MessageLoop::TYPE_IO, 0);
+ // TODO(akalin): Make this function return a bool and remove this
+ // CHECK().
+ CHECK(worker_thread_.StartWithOptions(options));
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::StartLibjingleThread));
}
-void MediatorThreadImpl::Run() {
- PlatformThread::SetName("Notifier_MediatorThread");
- MessageLoop message_loop;
- Post(this, CMD_PUMP_AUXILIARY_LOOPS);
- ProcessMessages(talk_base::kForever);
+void MediatorThreadImpl::StartLibjingleThread() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ socket_server_.reset(new talk_base::PhysicalSocketServer());
+ libjingle_thread_.reset(new talk_base::Thread());
+ talk_base::ThreadManager::SetCurrent(libjingle_thread_.get());
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop));
}
-void MediatorThreadImpl::PumpAuxiliaryLoops() {
- MessageLoop::current()->RunAllPending();
- // We want to pump auxiliary loops every 100ms until this thread is stopped,
- // at which point this call will do nothing.
- PostDelayed(100, this, CMD_PUMP_AUXILIARY_LOOPS);
+void MediatorThreadImpl::StopLibjingleThread() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ talk_base::ThreadManager::SetCurrent(NULL);
+ libjingle_thread_.reset();
+ socket_server_.reset();
}
-void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) {
- Post(this, CMD_LOGIN, new LoginData(settings));
+void MediatorThreadImpl::PumpLibjingleLoop() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ // Pump the libjingle message loop 100ms at a time.
+ if (!libjingle_thread_.get()) {
+ // StopLibjingleThread() was called.
+ return;
+ }
+ libjingle_thread_->ProcessMessages(100);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop));
}
-void MediatorThreadImpl::Stop() {
- Thread::Stop();
- CHECK(!login_.get() && !network_change_notifier_.get() && !pump_.get())
- << "Logout should be called prior to message queue exit.";
+void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings));
}
void MediatorThreadImpl::Logout() {
- CHECK(!IsQuitting())
- << "Logout should be called prior to message queue exit.";
- Post(this, CMD_DISCONNECT);
- Stop();
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect));
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::StopLibjingleThread));
+ // TODO(akalin): Decomp this into a separate stop method.
+ worker_thread_.Stop();
+ // Process any messages the worker thread may be posted on our
+ // thread.
+ bool old_state = parent_message_loop_->NestableTasksAllowed();
+ parent_message_loop_->SetNestableTasksAllowed(true);
+ parent_message_loop_->RunAllPending();
+ parent_message_loop_->SetNestableTasksAllowed(old_state);
+ // worker_thread_ should have cleaned all this up.
+ CHECK(!login_.get());
+ CHECK(!network_change_notifier_.get());
+ CHECK(!pump_.get());
}
void MediatorThreadImpl::ListenForUpdates() {
- Post(this, CMD_LISTEN_FOR_UPDATES);
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoListenForUpdates));
}
void MediatorThreadImpl::SubscribeForUpdates(
const std::vector<std::string>& subscribed_services_list) {
- Post(this, CMD_SUBSCRIBE_FOR_UPDATES,
- new SubscriptionData(subscribed_services_list));
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoSubscribeForUpdates,
+ subscribed_services_list));
}
void MediatorThreadImpl::SendNotification(
const OutgoingNotificationData& data) {
- Post(this, CMD_SEND_NOTIFICATION, new OutgoingNotificationMessageData(data));
-}
-
-void MediatorThreadImpl::ProcessMessages(int milliseconds) {
- talk_base::Thread::ProcessMessages(milliseconds);
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
+ worker_message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification,
+ data));
}
-void MediatorThreadImpl::OnMessage(talk_base::Message* msg) {
- scoped_ptr<LoginData> data;
- switch (msg->message_id) {
- case CMD_LOGIN:
- DCHECK(msg->pdata);
- data.reset(reinterpret_cast<LoginData*>(msg->pdata));
- DoLogin(data.get());
- break;
- case CMD_DISCONNECT:
- DoDisconnect();
- break;
- case CMD_LISTEN_FOR_UPDATES:
- DoListenForUpdates();
- break;
- case CMD_SEND_NOTIFICATION: {
- DCHECK(msg->pdata);
- scoped_ptr<OutgoingNotificationMessageData> data(
- reinterpret_cast<OutgoingNotificationMessageData*>(msg->pdata));
- DoSendNotification(*data);
- break;
- }
- case CMD_SUBSCRIBE_FOR_UPDATES: {
- DCHECK(msg->pdata);
- scoped_ptr<SubscriptionData> subscription_data(
- reinterpret_cast<SubscriptionData*>(msg->pdata));
- DoSubscribeForUpdates(*subscription_data);
- break;
- }
- case CMD_PUMP_AUXILIARY_LOOPS:
- PumpAuxiliaryLoops();
- break;
- default:
- LOG(ERROR) << "P2P: Someone passed a bad message to the thread.";
- break;
- }
-}
-
-void MediatorThreadImpl::DoLogin(LoginData* login_data) {
+void MediatorThreadImpl::DoLogin(
+ const buzz::XmppClientSettings& settings) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
LOG(INFO) << "P2P: Thread logging into talk network.";
- buzz::XmppClientSettings& user_settings = login_data->user_settings;
network_change_notifier_.reset(
new chrome_common_net::NetworkChangeNotifierProxy(
@@ -164,7 +182,7 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) {
// Language is not used in the stanza so we default to |en|.
std::string lang = "en";
login_.reset(new notifier::Login(pump_.get(),
- user_settings,
+ settings,
options,
lang,
host_resolver_.get(),
@@ -189,17 +207,8 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) {
login_->StartConnection();
}
-void MediatorThreadImpl::OnInputDebug(const char* msg, int length) {
- string output(msg, length);
- LOG(INFO) << "P2P: OnInputDebug:" << output << ".";
-}
-
-void MediatorThreadImpl::OnOutputDebug(const char* msg, int length) {
- string output(msg, length);
- LOG(INFO) << "P2P: OnOutputDebug:" << output << ".";
-}
-
void MediatorThreadImpl::DoDisconnect() {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
LOG(INFO) << "P2P: Thread logging out of talk network.";
login_.reset();
// Delete the old pump while on the thread to ensure that everything is
@@ -211,14 +220,10 @@ void MediatorThreadImpl::DoDisconnect() {
}
void MediatorThreadImpl::DoSubscribeForUpdates(
- const SubscriptionData& subscription_data) {
- buzz::XmppClient* client = xmpp_client();
- // If there isn't an active xmpp client, return.
- if (!client) {
- return;
- }
+ const std::vector<std::string>& subscribed_services_list) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
SubscribeTask* subscription =
- new SubscribeTask(client, subscription_data.subscribed_services_list);
+ new SubscribeTask(xmpp_client(), subscribed_services_list);
subscription->SignalStatusUpdate.connect(
this,
&MediatorThreadImpl::OnSubscriptionStateChange);
@@ -226,40 +231,56 @@ void MediatorThreadImpl::DoSubscribeForUpdates(
}
void MediatorThreadImpl::DoListenForUpdates() {
- buzz::XmppClient* client = xmpp_client();
- // If there isn't an active xmpp client, return.
- if (!client) {
- return;
- }
- ListenTask* listener = new ListenTask(client);
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ ListenTask* listener = new ListenTask(xmpp_client());
listener->SignalUpdateAvailable.connect(
this,
- &MediatorThreadImpl::OnUpdateListenerMessage);
+ &MediatorThreadImpl::OnIncomingNotification);
listener->Start();
}
void MediatorThreadImpl::DoSendNotification(
- const OutgoingNotificationMessageData& data) {
- buzz::XmppClient* client = xmpp_client();
- // If there isn't an active xmpp client, return.
- if (!client) {
- return;
- }
- SendUpdateTask* task = new SendUpdateTask(client, data.notification_data);
+ const OutgoingNotificationData& data) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ SendUpdateTask* task = new SendUpdateTask(xmpp_client(), data);
task->SignalStatusUpdate.connect(
this,
- &MediatorThreadImpl::OnUpdateNotificationSent);
+ &MediatorThreadImpl::OnOutgoingNotification);
task->Start();
}
-void MediatorThreadImpl::OnUpdateListenerMessage(
+void MediatorThreadImpl::OnIncomingNotification(
const IncomingNotificationData& notification_data) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnIncomingNotificationOnParentThread,
+ notification_data));
+}
+
+void MediatorThreadImpl::OnIncomingNotificationOnParentThread(
+ const IncomingNotificationData& notification_data) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_) {
delegate_->OnIncomingNotification(notification_data);
}
}
-void MediatorThreadImpl::OnUpdateNotificationSent(bool success) {
+void MediatorThreadImpl::OnOutgoingNotification(bool success) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnOutgoingNotificationOnParentThread,
+ success));
+}
+
+void MediatorThreadImpl::OnOutgoingNotificationOnParentThread(
+ bool success) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_ && success) {
delegate_->OnOutgoingNotification();
}
@@ -267,6 +288,18 @@ void MediatorThreadImpl::OnUpdateNotificationSent(bool success) {
void MediatorThreadImpl::OnLoginFailureMessage(
const notifier::LoginFailure& failure) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnLoginFailureMessageOnParentThread,
+ failure));
+}
+
+void MediatorThreadImpl::OnLoginFailureMessageOnParentThread(
+ const notifier::LoginFailure& failure) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_) {
delegate_->OnConnectionStateChange(false);
}
@@ -274,6 +307,18 @@ void MediatorThreadImpl::OnLoginFailureMessage(
void MediatorThreadImpl::OnClientStateChangeMessage(
LoginConnectionState state) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnClientStateChangeMessageOnParentThread,
+ state));
+}
+
+void MediatorThreadImpl::OnClientStateChangeMessageOnParentThread(
+ LoginConnectionState state) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
switch (state) {
case STATE_CLOSED:
if (delegate_) {
@@ -301,16 +346,39 @@ void MediatorThreadImpl::OnClientStateChangeMessage(
}
void MediatorThreadImpl::OnSubscriptionStateChange(bool success) {
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ parent_message_loop_->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(
+ this,
+ &MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread,
+ success));
+}
+
+void MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread(
+ bool success) {
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
if (delegate_) {
delegate_->OnSubscriptionStateChange(success);
}
}
+MessageLoop* MediatorThreadImpl::worker_message_loop() {
+ MessageLoop* current_message_loop = MessageLoop::current();
+ DCHECK(current_message_loop);
+ MessageLoop* worker_message_loop = worker_thread_.message_loop();
+ DCHECK(worker_message_loop);
+ DCHECK(current_message_loop == parent_message_loop_ ||
+ current_message_loop == worker_message_loop);
+ return worker_message_loop;
+}
+
buzz::XmppClient* MediatorThreadImpl::xmpp_client() {
- if (!login_.get()) {
- return NULL;
- }
- return login_->xmpp_client();
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop());
+ DCHECK(login_.get());
+ buzz::XmppClient* xmpp_client = login_->xmpp_client();
+ DCHECK(xmpp_client);
+ return xmpp_client;
}
} // namespace notifier
« no previous file with comments | « chrome/common/net/notifier/listener/mediator_thread_impl.h ('k') | chrome/common/net/notifier/listener/talk_mediator_impl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698