| Index: chrome/common/net/notifier/listener/mediator_thread_impl.cc
|
| diff --git a/chrome/common/net/notifier/listener/mediator_thread_impl.cc b/chrome/common/net/notifier/listener/mediator_thread_impl.cc
|
| index f13dd258220fd4520292783e8466be1af7871363..ac88983c410c6965ba69f048fae892900cae949e 100644
|
| --- a/chrome/common/net/notifier/listener/mediator_thread_impl.cc
|
| +++ b/chrome/common/net/notifier/listener/mediator_thread_impl.cc
|
| @@ -6,7 +6,7 @@
|
|
|
| #include "base/logging.h"
|
| #include "base/message_loop.h"
|
| -#include "base/platform_thread.h"
|
| +#include "base/task.h"
|
| #include "chrome/common/net/network_change_notifier_proxy.h"
|
| #include "chrome/common/net/notifier/base/task_pump.h"
|
| #include "chrome/common/net/notifier/communicator/connection_options.h"
|
| @@ -17,11 +17,17 @@
|
| #include "chrome/common/net/notifier/listener/subscribe_task.h"
|
| #include "net/base/host_port_pair.h"
|
| #include "net/base/host_resolver.h"
|
| +#include "talk/base/physicalsocketserver.h"
|
| #include "talk/base/thread.h"
|
| #include "talk/xmpp/xmppclient.h"
|
| #include "talk/xmpp/xmppclientsettings.h"
|
|
|
| -using std::string;
|
| +// We manage the lifetime of notifier::MediatorThreadImpl ourselves.
|
| +template <>
|
| +struct RunnableMethodTraits<notifier::MediatorThreadImpl> {
|
| + void RetainCallee(notifier::MediatorThreadImpl*) {}
|
| + void ReleaseCallee(notifier::MediatorThreadImpl*) {}
|
| +};
|
|
|
| namespace notifier {
|
|
|
| @@ -29,111 +35,123 @@ MediatorThreadImpl::MediatorThreadImpl(
|
| chrome_common_net::NetworkChangeNotifierThread*
|
| network_change_notifier_thread)
|
| : delegate_(NULL),
|
| - network_change_notifier_thread_(network_change_notifier_thread) {
|
| + parent_message_loop_(MessageLoop::current()),
|
| + network_change_notifier_thread_(network_change_notifier_thread),
|
| + worker_thread_("MediatorThread worker thread") {
|
| + DCHECK(parent_message_loop_);
|
| DCHECK(network_change_notifier_thread_);
|
| }
|
|
|
| MediatorThreadImpl::~MediatorThreadImpl() {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| }
|
|
|
| void MediatorThreadImpl::SetDelegate(Delegate* delegate) {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| delegate_ = delegate;
|
| }
|
|
|
| void MediatorThreadImpl::Start() {
|
| - talk_base::Thread::Start();
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| + // We create the worker thread as an IO thread in preparation for
|
| + // making this use Chrome sockets.
|
| + const base::Thread::Options options(MessageLoop::TYPE_IO, 0);
|
| + // TODO(akalin): Make this function return a bool and remove this
|
| + // CHECK().
|
| + CHECK(worker_thread_.StartWithOptions(options));
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::StartLibjingleThread));
|
| }
|
|
|
| -void MediatorThreadImpl::Run() {
|
| - PlatformThread::SetName("Notifier_MediatorThread");
|
| - MessageLoop message_loop;
|
| - Post(this, CMD_PUMP_AUXILIARY_LOOPS);
|
| - ProcessMessages(talk_base::kForever);
|
| +void MediatorThreadImpl::StartLibjingleThread() {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + socket_server_.reset(new talk_base::PhysicalSocketServer());
|
| + libjingle_thread_.reset(new talk_base::Thread());
|
| + talk_base::ThreadManager::SetCurrent(libjingle_thread_.get());
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop));
|
| }
|
|
|
| -void MediatorThreadImpl::PumpAuxiliaryLoops() {
|
| - MessageLoop::current()->RunAllPending();
|
| - // We want to pump auxiliary loops every 100ms until this thread is stopped,
|
| - // at which point this call will do nothing.
|
| - PostDelayed(100, this, CMD_PUMP_AUXILIARY_LOOPS);
|
| +void MediatorThreadImpl::StopLibjingleThread() {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + talk_base::ThreadManager::SetCurrent(NULL);
|
| + libjingle_thread_.reset();
|
| + socket_server_.reset();
|
| }
|
|
|
| -void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) {
|
| - Post(this, CMD_LOGIN, new LoginData(settings));
|
| +void MediatorThreadImpl::PumpLibjingleLoop() {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + // Pump the libjingle message loop 100ms at a time.
|
| + if (!libjingle_thread_.get()) {
|
| + // StopLibjingleThread() was called.
|
| + return;
|
| + }
|
| + libjingle_thread_->ProcessMessages(100);
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::PumpLibjingleLoop));
|
| }
|
|
|
| -void MediatorThreadImpl::Stop() {
|
| - Thread::Stop();
|
| - CHECK(!login_.get() && !network_change_notifier_.get() && !pump_.get())
|
| - << "Logout should be called prior to message queue exit.";
|
| +void MediatorThreadImpl::Login(const buzz::XmppClientSettings& settings) {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::DoLogin, settings));
|
| }
|
|
|
| void MediatorThreadImpl::Logout() {
|
| - CHECK(!IsQuitting())
|
| - << "Logout should be called prior to message queue exit.";
|
| - Post(this, CMD_DISCONNECT);
|
| - Stop();
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::DoDisconnect));
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::StopLibjingleThread));
|
| + // TODO(akalin): Decomp this into a separate stop method.
|
| + worker_thread_.Stop();
|
| + // Process any messages the worker thread may be posted on our
|
| + // thread.
|
| + bool old_state = parent_message_loop_->NestableTasksAllowed();
|
| + parent_message_loop_->SetNestableTasksAllowed(true);
|
| + parent_message_loop_->RunAllPending();
|
| + parent_message_loop_->SetNestableTasksAllowed(old_state);
|
| + // worker_thread_ should have cleaned all this up.
|
| + CHECK(!login_.get());
|
| + CHECK(!network_change_notifier_.get());
|
| + CHECK(!pump_.get());
|
| }
|
|
|
| void MediatorThreadImpl::ListenForUpdates() {
|
| - Post(this, CMD_LISTEN_FOR_UPDATES);
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::DoListenForUpdates));
|
| }
|
|
|
| void MediatorThreadImpl::SubscribeForUpdates(
|
| const std::vector<std::string>& subscribed_services_list) {
|
| - Post(this, CMD_SUBSCRIBE_FOR_UPDATES,
|
| - new SubscriptionData(subscribed_services_list));
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::DoSubscribeForUpdates,
|
| + subscribed_services_list));
|
| }
|
|
|
| void MediatorThreadImpl::SendNotification(
|
| const OutgoingNotificationData& data) {
|
| - Post(this, CMD_SEND_NOTIFICATION, new OutgoingNotificationMessageData(data));
|
| -}
|
| -
|
| -void MediatorThreadImpl::ProcessMessages(int milliseconds) {
|
| - talk_base::Thread::ProcessMessages(milliseconds);
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| + worker_message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &MediatorThreadImpl::DoSendNotification,
|
| + data));
|
| }
|
|
|
| -void MediatorThreadImpl::OnMessage(talk_base::Message* msg) {
|
| - scoped_ptr<LoginData> data;
|
| - switch (msg->message_id) {
|
| - case CMD_LOGIN:
|
| - DCHECK(msg->pdata);
|
| - data.reset(reinterpret_cast<LoginData*>(msg->pdata));
|
| - DoLogin(data.get());
|
| - break;
|
| - case CMD_DISCONNECT:
|
| - DoDisconnect();
|
| - break;
|
| - case CMD_LISTEN_FOR_UPDATES:
|
| - DoListenForUpdates();
|
| - break;
|
| - case CMD_SEND_NOTIFICATION: {
|
| - DCHECK(msg->pdata);
|
| - scoped_ptr<OutgoingNotificationMessageData> data(
|
| - reinterpret_cast<OutgoingNotificationMessageData*>(msg->pdata));
|
| - DoSendNotification(*data);
|
| - break;
|
| - }
|
| - case CMD_SUBSCRIBE_FOR_UPDATES: {
|
| - DCHECK(msg->pdata);
|
| - scoped_ptr<SubscriptionData> subscription_data(
|
| - reinterpret_cast<SubscriptionData*>(msg->pdata));
|
| - DoSubscribeForUpdates(*subscription_data);
|
| - break;
|
| - }
|
| - case CMD_PUMP_AUXILIARY_LOOPS:
|
| - PumpAuxiliaryLoops();
|
| - break;
|
| - default:
|
| - LOG(ERROR) << "P2P: Someone passed a bad message to the thread.";
|
| - break;
|
| - }
|
| -}
|
| -
|
| -void MediatorThreadImpl::DoLogin(LoginData* login_data) {
|
| +void MediatorThreadImpl::DoLogin(
|
| + const buzz::XmppClientSettings& settings) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| LOG(INFO) << "P2P: Thread logging into talk network.";
|
| - buzz::XmppClientSettings& user_settings = login_data->user_settings;
|
|
|
| network_change_notifier_.reset(
|
| new chrome_common_net::NetworkChangeNotifierProxy(
|
| @@ -164,7 +182,7 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) {
|
| // Language is not used in the stanza so we default to |en|.
|
| std::string lang = "en";
|
| login_.reset(new notifier::Login(pump_.get(),
|
| - user_settings,
|
| + settings,
|
| options,
|
| lang,
|
| host_resolver_.get(),
|
| @@ -189,17 +207,8 @@ void MediatorThreadImpl::DoLogin(LoginData* login_data) {
|
| login_->StartConnection();
|
| }
|
|
|
| -void MediatorThreadImpl::OnInputDebug(const char* msg, int length) {
|
| - string output(msg, length);
|
| - LOG(INFO) << "P2P: OnInputDebug:" << output << ".";
|
| -}
|
| -
|
| -void MediatorThreadImpl::OnOutputDebug(const char* msg, int length) {
|
| - string output(msg, length);
|
| - LOG(INFO) << "P2P: OnOutputDebug:" << output << ".";
|
| -}
|
| -
|
| void MediatorThreadImpl::DoDisconnect() {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| LOG(INFO) << "P2P: Thread logging out of talk network.";
|
| login_.reset();
|
| // Delete the old pump while on the thread to ensure that everything is
|
| @@ -211,14 +220,10 @@ void MediatorThreadImpl::DoDisconnect() {
|
| }
|
|
|
| void MediatorThreadImpl::DoSubscribeForUpdates(
|
| - const SubscriptionData& subscription_data) {
|
| - buzz::XmppClient* client = xmpp_client();
|
| - // If there isn't an active xmpp client, return.
|
| - if (!client) {
|
| - return;
|
| - }
|
| + const std::vector<std::string>& subscribed_services_list) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| SubscribeTask* subscription =
|
| - new SubscribeTask(client, subscription_data.subscribed_services_list);
|
| + new SubscribeTask(xmpp_client(), subscribed_services_list);
|
| subscription->SignalStatusUpdate.connect(
|
| this,
|
| &MediatorThreadImpl::OnSubscriptionStateChange);
|
| @@ -226,40 +231,56 @@ void MediatorThreadImpl::DoSubscribeForUpdates(
|
| }
|
|
|
| void MediatorThreadImpl::DoListenForUpdates() {
|
| - buzz::XmppClient* client = xmpp_client();
|
| - // If there isn't an active xmpp client, return.
|
| - if (!client) {
|
| - return;
|
| - }
|
| - ListenTask* listener = new ListenTask(client);
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + ListenTask* listener = new ListenTask(xmpp_client());
|
| listener->SignalUpdateAvailable.connect(
|
| this,
|
| - &MediatorThreadImpl::OnUpdateListenerMessage);
|
| + &MediatorThreadImpl::OnIncomingNotification);
|
| listener->Start();
|
| }
|
|
|
| void MediatorThreadImpl::DoSendNotification(
|
| - const OutgoingNotificationMessageData& data) {
|
| - buzz::XmppClient* client = xmpp_client();
|
| - // If there isn't an active xmpp client, return.
|
| - if (!client) {
|
| - return;
|
| - }
|
| - SendUpdateTask* task = new SendUpdateTask(client, data.notification_data);
|
| + const OutgoingNotificationData& data) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + SendUpdateTask* task = new SendUpdateTask(xmpp_client(), data);
|
| task->SignalStatusUpdate.connect(
|
| this,
|
| - &MediatorThreadImpl::OnUpdateNotificationSent);
|
| + &MediatorThreadImpl::OnOutgoingNotification);
|
| task->Start();
|
| }
|
|
|
| -void MediatorThreadImpl::OnUpdateListenerMessage(
|
| +void MediatorThreadImpl::OnIncomingNotification(
|
| const IncomingNotificationData& notification_data) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + parent_message_loop_->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(
|
| + this,
|
| + &MediatorThreadImpl::OnIncomingNotificationOnParentThread,
|
| + notification_data));
|
| +}
|
| +
|
| +void MediatorThreadImpl::OnIncomingNotificationOnParentThread(
|
| + const IncomingNotificationData& notification_data) {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| if (delegate_) {
|
| delegate_->OnIncomingNotification(notification_data);
|
| }
|
| }
|
|
|
| -void MediatorThreadImpl::OnUpdateNotificationSent(bool success) {
|
| +void MediatorThreadImpl::OnOutgoingNotification(bool success) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + parent_message_loop_->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(
|
| + this,
|
| + &MediatorThreadImpl::OnOutgoingNotificationOnParentThread,
|
| + success));
|
| +}
|
| +
|
| +void MediatorThreadImpl::OnOutgoingNotificationOnParentThread(
|
| + bool success) {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| if (delegate_ && success) {
|
| delegate_->OnOutgoingNotification();
|
| }
|
| @@ -267,6 +288,18 @@ void MediatorThreadImpl::OnUpdateNotificationSent(bool success) {
|
|
|
| void MediatorThreadImpl::OnLoginFailureMessage(
|
| const notifier::LoginFailure& failure) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + parent_message_loop_->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(
|
| + this,
|
| + &MediatorThreadImpl::OnLoginFailureMessageOnParentThread,
|
| + failure));
|
| +}
|
| +
|
| +void MediatorThreadImpl::OnLoginFailureMessageOnParentThread(
|
| + const notifier::LoginFailure& failure) {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| if (delegate_) {
|
| delegate_->OnConnectionStateChange(false);
|
| }
|
| @@ -274,6 +307,18 @@ void MediatorThreadImpl::OnLoginFailureMessage(
|
|
|
| void MediatorThreadImpl::OnClientStateChangeMessage(
|
| LoginConnectionState state) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + parent_message_loop_->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(
|
| + this,
|
| + &MediatorThreadImpl::OnClientStateChangeMessageOnParentThread,
|
| + state));
|
| +}
|
| +
|
| +void MediatorThreadImpl::OnClientStateChangeMessageOnParentThread(
|
| + LoginConnectionState state) {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| switch (state) {
|
| case STATE_CLOSED:
|
| if (delegate_) {
|
| @@ -301,16 +346,39 @@ void MediatorThreadImpl::OnClientStateChangeMessage(
|
| }
|
|
|
| void MediatorThreadImpl::OnSubscriptionStateChange(bool success) {
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + parent_message_loop_->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(
|
| + this,
|
| + &MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread,
|
| + success));
|
| +}
|
| +
|
| +void MediatorThreadImpl::OnSubscriptionStateChangeOnParentThread(
|
| + bool success) {
|
| + DCHECK_EQ(MessageLoop::current(), parent_message_loop_);
|
| if (delegate_) {
|
| delegate_->OnSubscriptionStateChange(success);
|
| }
|
| }
|
|
|
| +MessageLoop* MediatorThreadImpl::worker_message_loop() {
|
| + MessageLoop* current_message_loop = MessageLoop::current();
|
| + DCHECK(current_message_loop);
|
| + MessageLoop* worker_message_loop = worker_thread_.message_loop();
|
| + DCHECK(worker_message_loop);
|
| + DCHECK(current_message_loop == parent_message_loop_ ||
|
| + current_message_loop == worker_message_loop);
|
| + return worker_message_loop;
|
| +}
|
| +
|
| buzz::XmppClient* MediatorThreadImpl::xmpp_client() {
|
| - if (!login_.get()) {
|
| - return NULL;
|
| - }
|
| - return login_->xmpp_client();
|
| + DCHECK_EQ(MessageLoop::current(), worker_message_loop());
|
| + DCHECK(login_.get());
|
| + buzz::XmppClient* xmpp_client = login_->xmpp_client();
|
| + DCHECK(xmpp_client);
|
| + return xmpp_client;
|
| }
|
|
|
| } // namespace notifier
|
|
|