Index: chrome/common/net/notifier/listener/mediator_thread_impl.cc |
diff --git a/chrome/common/net/notifier/listener/mediator_thread_impl.cc b/chrome/common/net/notifier/listener/mediator_thread_impl.cc |
index f13dd258220fd4520292783e8466be1af7871363..ac88983c410c6965ba69f048fae892900cae949e 100644 |
--- a/chrome/common/net/notifier/listener/mediator_thread_impl.cc |
+++ b/chrome/common/net/notifier/listener/mediator_thread_impl.cc |
@@ -6,7 +6,7 @@ |
#include "base/logging.h" |
#include "base/message_loop.h" |
-#include "base/platform_thread.h" |
+#include "base/task.h" |
#include "chrome/common/net/network_change_notifier_proxy.h" |
#include "chrome/common/net/notifier/base/task_pump.h" |
#include "chrome/common/net/notifier/communicator/connection_options.h" |
@@ -17,11 +17,17 @@ |
#include "chrome/common/net/notifier/listener/subscribe_task.h" |
#include "net/base/host_port_pair.h" |
#include "net/base/host_resolver.h" |
+#include "talk/base/physicalsocketserver.h" |
#include "talk/base/thread.h" |
#include "talk/xmpp/xmppclient.h" |
#include "talk/xmpp/xmppclientsettings.h" |
-using std::string; |
+// We manage the lifetime of notifier::MediatorThreadImpl ourselves. |
+template <> |
+struct RunnableMethodTraits<notifier::MediatorThreadImpl> { |
+ void RetainCallee(notifier::MediatorThreadImpl*) {} |
+ void ReleaseCallee(notifier::MediatorThreadImpl*) {} |
+}; |
namespace notifier { |
@@ -29,111 +35,123 @@ MediatorThreadImpl::MediatorThreadImpl( |
chrome_common_net::NetworkChangeNotifierThread* |
network_change_notifier_thread) |
: delegate_(NULL), |
- network_change_notifier_thread_(network_change_notifier_thread) { |
+ parent_message_loop_(MessageLoop::current()), |
+ network_change_notifier_thread_(network_change_notifier_thread), |
+ worker_thread_("MediatorThread worker thread") { |
+ DCHECK(parent_message_loop_); |
DCHECK(network_change_notifier_thread_); |
} |
MediatorThreadImpl::~MediatorThreadImpl() { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
} |
void MediatorThreadImpl::SetDelegate(Delegate* delegate) { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
delegate_ = delegate; |
} |
void MediatorThreadImpl::Start() { |
- talk_base::Thread::Start(); |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
+ // We create the worker thread as an IO thread in preparation for |
+ // making this use Chrome sockets. |
+ const base::Thread::Options options(MessageLoop::TYPE_IO, 0); |
+ // TODO(akalin): Make this function return a bool and remove this |
+ // CHECK(). |
+ CHECK(worker_thread_.StartWithOptions(options)); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::StartLibjingleThread)); |
} |
-void MediatorThreadImpl::Run() { |
- PlatformThread::SetName("Notifier_MediatorThread"); |
- MessageLoop message_loop; |
- Post(this, CMD_PUMP_AUXILIARY_LOOPS); |
- ProcessMessages(talk_base::kForever); |
+void MediatorThreadImpl::StartLibjingleThread() { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ socket_server_.reset(new talk_base::PhysicalSocketServer()); |
+ libjingle_thread_.reset(new talk_base::Thread()); |
+ talk_base::ThreadManager::SetCurrent(libjingle_thread_.get()); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop)); |
} |
-void MediatorThreadImpl::PumpAuxiliaryLoops() { |
- MessageLoop::current()->RunAllPending(); |
- // We want to pump auxiliary loops every 100ms until this thread is stopped, |
- // at which point this call will do nothing. |
- PostDelayed(100, this, CMD_PUMP_AUXILIARY_LOOPS); |
+void MediatorThreadImpl::StopLibjingleThread() { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ talk_base::ThreadManager::SetCurrent(NULL); |
+ libjingle_thread_.reset(); |
+ socket_server_.reset(); |
} |
-void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { |
- Post(this, CMD_LOGIN, new LoginData(settings)); |
+void MediatorThreadImpl::PumpLibjingleLoop() { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ // Pump the libjingle message loop 100ms at a time. |
+ if (!libjingle_thread_.get()) { |
+ // StopLibjingleThread() was called. |
+ return; |
+ } |
+ libjingle_thread_->ProcessMessages(100); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop)); |
} |
-void MediatorThreadImpl::Stop() { |
- Thread::Stop(); |
- CHECK(!login_.get() && !network_change_notifier_.get() && !pump_.get()) |
- << "Logout should be called prior to message queue exit."; |
+void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings)); |
} |
void MediatorThreadImpl::Logout() { |
- CHECK(!IsQuitting()) |
- << "Logout should be called prior to message queue exit."; |
- Post(this, CMD_DISCONNECT); |
- Stop(); |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect)); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::StopLibjingleThread)); |
+ // TODO(akalin): Decomp this into a separate stop method. |
+ worker_thread_.Stop(); |
+ // Process any messages the worker thread may be posted on our |
+ // thread. |
+ bool old_state = parent_message_loop_->NestableTasksAllowed(); |
+ parent_message_loop_->SetNestableTasksAllowed(true); |
+ parent_message_loop_->RunAllPending(); |
+ parent_message_loop_->SetNestableTasksAllowed(old_state); |
+ // worker_thread_ should have cleaned all this up. |
+ CHECK(!login_.get()); |
+ CHECK(!network_change_notifier_.get()); |
+ CHECK(!pump_.get()); |
} |
void MediatorThreadImpl::ListenForUpdates() { |
- Post(this, CMD_LISTEN_FOR_UPDATES); |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::DoListenForUpdates)); |
} |
void MediatorThreadImpl::SubscribeForUpdates( |
const std::vector<std::string>& subscribed_services_list) { |
- Post(this, CMD_SUBSCRIBE_FOR_UPDATES, |
- new SubscriptionData(subscribed_services_list)); |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::DoSubscribeForUpdates, |
+ subscribed_services_list)); |
} |
void MediatorThreadImpl::SendNotification( |
const OutgoingNotificationData& data) { |
- Post(this, CMD_SEND_NOTIFICATION, new OutgoingNotificationMessageData(data)); |
-} |
- |
-void MediatorThreadImpl::ProcessMessages(int milliseconds) { |
- talk_base::Thread::ProcessMessages(milliseconds); |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
+ worker_message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification, |
+ data)); |
} |
-void MediatorThreadImpl::OnMessage(talk_base::Message* msg) { |
- scoped_ptr<LoginData> data; |
- switch (msg->message_id) { |
- case CMD_LOGIN: |
- DCHECK(msg->pdata); |
- data.reset(reinterpret_cast<LoginData*>(msg->pdata)); |
- DoLogin(data.get()); |
- break; |
- case CMD_DISCONNECT: |
- DoDisconnect(); |
- break; |
- case CMD_LISTEN_FOR_UPDATES: |
- DoListenForUpdates(); |
- break; |
- case CMD_SEND_NOTIFICATION: { |
- DCHECK(msg->pdata); |
- scoped_ptr<OutgoingNotificationMessageData> data( |
- reinterpret_cast<OutgoingNotificationMessageData*>(msg->pdata)); |
- DoSendNotification(*data); |
- break; |
- } |
- case CMD_SUBSCRIBE_FOR_UPDATES: { |
- DCHECK(msg->pdata); |
- scoped_ptr<SubscriptionData> subscription_data( |
- reinterpret_cast<SubscriptionData*>(msg->pdata)); |
- DoSubscribeForUpdates(*subscription_data); |
- break; |
- } |
- case CMD_PUMP_AUXILIARY_LOOPS: |
- PumpAuxiliaryLoops(); |
- break; |
- default: |
- LOG(ERROR) << "P2P: Someone passed a bad message to the thread."; |
- break; |
- } |
-} |
- |
-void MediatorThreadImpl::DoLogin(LoginData* login_data) { |
+void MediatorThreadImpl::DoLogin( |
+ const buzz::XmppClientSettings& settings) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
LOG(INFO) << "P2P: Thread logging into talk network."; |
- buzz::XmppClientSettings& user_settings = login_data->user_settings; |
network_change_notifier_.reset( |
new chrome_common_net::NetworkChangeNotifierProxy( |
@@ -164,7 +182,7 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) { |
// Language is not used in the stanza so we default to |en|. |
std::string lang = "en"; |
login_.reset(new notifier::Login(pump_.get(), |
- user_settings, |
+ settings, |
options, |
lang, |
host_resolver_.get(), |
@@ -189,17 +207,8 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) { |
login_->StartConnection(); |
} |
-void MediatorThreadImpl::OnInputDebug(const char* msg, int length) { |
- string output(msg, length); |
- LOG(INFO) << "P2P: OnInputDebug:" << output << "."; |
-} |
- |
-void MediatorThreadImpl::OnOutputDebug(const char* msg, int length) { |
- string output(msg, length); |
- LOG(INFO) << "P2P: OnOutputDebug:" << output << "."; |
-} |
- |
void MediatorThreadImpl::DoDisconnect() { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
LOG(INFO) << "P2P: Thread logging out of talk network."; |
login_.reset(); |
// Delete the old pump while on the thread to ensure that everything is |
@@ -211,14 +220,10 @@ void MediatorThreadImpl::DoDisconnect() { |
} |
void MediatorThreadImpl::DoSubscribeForUpdates( |
- const SubscriptionData& subscription_data) { |
- buzz::XmppClient* client = xmpp_client(); |
- // If there isn't an active xmpp client, return. |
- if (!client) { |
- return; |
- } |
+ const std::vector<std::string>& subscribed_services_list) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
SubscribeTask* subscription = |
- new SubscribeTask(client, subscription_data.subscribed_services_list); |
+ new SubscribeTask(xmpp_client(), subscribed_services_list); |
subscription->SignalStatusUpdate.connect( |
this, |
&MediatorThreadImpl::OnSubscriptionStateChange); |
@@ -226,40 +231,56 @@ void MediatorThreadImpl::DoSubscribeForUpdates( |
} |
void MediatorThreadImpl::DoListenForUpdates() { |
- buzz::XmppClient* client = xmpp_client(); |
- // If there isn't an active xmpp client, return. |
- if (!client) { |
- return; |
- } |
- ListenTask* listener = new ListenTask(client); |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ ListenTask* listener = new ListenTask(xmpp_client()); |
listener->SignalUpdateAvailable.connect( |
this, |
- &MediatorThreadImpl::OnUpdateListenerMessage); |
+ &MediatorThreadImpl::OnIncomingNotification); |
listener->Start(); |
} |
void MediatorThreadImpl::DoSendNotification( |
- const OutgoingNotificationMessageData& data) { |
- buzz::XmppClient* client = xmpp_client(); |
- // If there isn't an active xmpp client, return. |
- if (!client) { |
- return; |
- } |
- SendUpdateTask* task = new SendUpdateTask(client, data.notification_data); |
+ const OutgoingNotificationData& data) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ SendUpdateTask* task = new SendUpdateTask(xmpp_client(), data); |
task->SignalStatusUpdate.connect( |
this, |
- &MediatorThreadImpl::OnUpdateNotificationSent); |
+ &MediatorThreadImpl::OnOutgoingNotification); |
task->Start(); |
} |
-void MediatorThreadImpl::OnUpdateListenerMessage( |
+void MediatorThreadImpl::OnIncomingNotification( |
const IncomingNotificationData& notification_data) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ parent_message_loop_->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod( |
+ this, |
+ &MediatorThreadImpl::OnIncomingNotificationOnParentThread, |
+ notification_data)); |
+} |
+ |
+void MediatorThreadImpl::OnIncomingNotificationOnParentThread( |
+ const IncomingNotificationData& notification_data) { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
if (delegate_) { |
delegate_->OnIncomingNotification(notification_data); |
} |
} |
-void MediatorThreadImpl::OnUpdateNotificationSent(bool success) { |
+void MediatorThreadImpl::OnOutgoingNotification(bool success) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ parent_message_loop_->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod( |
+ this, |
+ &MediatorThreadImpl::OnOutgoingNotificationOnParentThread, |
+ success)); |
+} |
+ |
+void MediatorThreadImpl::OnOutgoingNotificationOnParentThread( |
+ bool success) { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
if (delegate_ && success) { |
delegate_->OnOutgoingNotification(); |
} |
@@ -267,6 +288,18 @@ void MediatorThreadImpl::OnUpdateNotificationSent(bool success) { |
void MediatorThreadImpl::OnLoginFailureMessage( |
const notifier::LoginFailure& failure) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ parent_message_loop_->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod( |
+ this, |
+ &MediatorThreadImpl::OnLoginFailureMessageOnParentThread, |
+ failure)); |
+} |
+ |
+void MediatorThreadImpl::OnLoginFailureMessageOnParentThread( |
+ const notifier::LoginFailure& failure) { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
if (delegate_) { |
delegate_->OnConnectionStateChange(false); |
} |
@@ -274,6 +307,18 @@ void MediatorThreadImpl::OnLoginFailureMessage( |
void MediatorThreadImpl::OnClientStateChangeMessage( |
LoginConnectionState state) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ parent_message_loop_->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod( |
+ this, |
+ &MediatorThreadImpl::OnClientStateChangeMessageOnParentThread, |
+ state)); |
+} |
+ |
+void MediatorThreadImpl::OnClientStateChangeMessageOnParentThread( |
+ LoginConnectionState state) { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
switch (state) { |
case STATE_CLOSED: |
if (delegate_) { |
@@ -301,16 +346,39 @@ void MediatorThreadImpl::OnClientStateChangeMessage( |
} |
void MediatorThreadImpl::OnSubscriptionStateChange(bool success) { |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ parent_message_loop_->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod( |
+ this, |
+ &MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread, |
+ success)); |
+} |
+ |
+void MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread( |
+ bool success) { |
+ DCHECK_EQ(MessageLoop::current(), parent_message_loop_); |
if (delegate_) { |
delegate_->OnSubscriptionStateChange(success); |
} |
} |
+MessageLoop* MediatorThreadImpl::worker_message_loop() { |
+ MessageLoop* current_message_loop = MessageLoop::current(); |
+ DCHECK(current_message_loop); |
+ MessageLoop* worker_message_loop = worker_thread_.message_loop(); |
+ DCHECK(worker_message_loop); |
+ DCHECK(current_message_loop == parent_message_loop_ || |
+ current_message_loop == worker_message_loop); |
+ return worker_message_loop; |
+} |
+ |
buzz::XmppClient* MediatorThreadImpl::xmpp_client() { |
- if (!login_.get()) { |
- return NULL; |
- } |
- return login_->xmpp_client(); |
+ DCHECK_EQ(MessageLoop::current(), worker_message_loop()); |
+ DCHECK(login_.get()); |
+ buzz::XmppClient* xmpp_client = login_->xmpp_client(); |
+ DCHECK(xmpp_client); |
+ return xmpp_client; |
} |
} // namespace notifier |