| Index: chrome/browser/sync/tools/sync_listen_notifications.cc
|
| diff --git a/chrome/browser/sync/tools/sync_listen_notifications.cc b/chrome/browser/sync/tools/sync_listen_notifications.cc
|
| index 899798ef66d0922d346b5a1fa05a185e69e95857..2e762ac31d567cc2247f38dc7bc6ae0c226f8b29 100644
|
| --- a/chrome/browser/sync/tools/sync_listen_notifications.cc
|
| +++ b/chrome/browser/sync/tools/sync_listen_notifications.cc
|
| @@ -2,13 +2,10 @@
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
|
|
| -#include <cstdlib>
|
| #include <string>
|
| -#include <sstream>
|
| #include <vector>
|
|
|
| #include "base/at_exit.h"
|
| -#include "base/base64.h"
|
| #include "base/command_line.h"
|
| #include "base/logging.h"
|
| #include "base/message_loop.h"
|
| @@ -16,15 +13,17 @@
|
| #include "base/string_util.h"
|
| #include "base/task.h"
|
| #include "chrome/browser/sync/notification_method.h"
|
| +#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
|
| +#include "chrome/browser/sync/notifier/chrome_invalidation_client.h"
|
| +#include "chrome/browser/sync/notifier/chrome_system_resources.h"
|
| +#include "chrome/browser/sync/notifier/invalidation_util.h"
|
| #include "chrome/browser/sync/sync_constants.h"
|
| #include "chrome/common/net/notifier/base/task_pump.h"
|
| #include "chrome/common/net/notifier/communicator/xmpp_socket_adapter.h"
|
| #include "chrome/common/net/notifier/listener/listen_task.h"
|
| #include "chrome/common/net/notifier/listener/notification_constants.h"
|
| #include "chrome/common/net/notifier/listener/subscribe_task.h"
|
| -#include "chrome/common/net/notifier/listener/xml_element_util.h"
|
| #include "google/cacheinvalidation/invalidation-client.h"
|
| -#include "google/cacheinvalidation/invalidation-client-impl.h"
|
| #include "talk/base/cryptstring.h"
|
| #include "talk/base/logging.h"
|
| #include "talk/base/sigslot.h"
|
| @@ -194,148 +193,6 @@ class LegacyNotifierDelegate : public XmppNotificationClient::Delegate {
|
| }
|
| };
|
|
|
| -// TODO(akalin): Move all the cache-invalidation-related stuff below
|
| -// out of this file once we have a real Chrome-integrated
|
| -// implementation.
|
| -
|
| -void RunAndDeleteClosure(invalidation::Closure* task) {
|
| - task->Run();
|
| - delete task;
|
| -}
|
| -
|
| -// Simple system resources class that uses the current message loop
|
| -// for scheduling. Assumes the current message loop is already
|
| -// running.
|
| -class ChromeSystemResources : public invalidation::SystemResources {
|
| - public:
|
| - ChromeSystemResources() : scheduler_active_(false) {}
|
| -
|
| - ~ChromeSystemResources() {
|
| - CHECK(!scheduler_active_);
|
| - }
|
| -
|
| - virtual invalidation::Time current_time() {
|
| - return base::Time::Now();
|
| - }
|
| -
|
| - virtual void StartScheduler() {
|
| - CHECK(!scheduler_active_);
|
| - scheduler_active_ = true;
|
| - }
|
| -
|
| - // We assume that the current message loop is stopped shortly after
|
| - // this is called, i.e. that any in-flight delayed tasks won't get
|
| - // run.
|
| - //
|
| - // TODO(akalin): Make sure that the above actually holds.
|
| - virtual void StopScheduler() {
|
| - CHECK(scheduler_active_);
|
| - scheduler_active_ = false;
|
| - }
|
| -
|
| - virtual void ScheduleWithDelay(invalidation::TimeDelta delay,
|
| - invalidation::Closure* task) {
|
| - if (!scheduler_active_) {
|
| - delete task;
|
| - return;
|
| - }
|
| - CHECK(invalidation::IsCallbackRepeatable(task));
|
| - MessageLoop::current()->PostDelayedTask(
|
| - FROM_HERE,
|
| - NewRunnableFunction(&RunAndDeleteClosure, task),
|
| - delay.InMillisecondsRoundedUp());
|
| - }
|
| -
|
| - virtual void ScheduleImmediately(invalidation::Closure* task) {
|
| - if (!scheduler_active_) {
|
| - delete task;
|
| - return;
|
| - }
|
| - CHECK(invalidation::IsCallbackRepeatable(task));
|
| - MessageLoop::current()->PostTask(
|
| - FROM_HERE, NewRunnableFunction(&RunAndDeleteClosure, task));
|
| - }
|
| -
|
| - virtual void Log(LogLevel level, const char* file, int line,
|
| - const char* format, ...) {
|
| - va_list ap;
|
| - va_start(ap, format);
|
| - std::string result;
|
| - StringAppendV(&result, format, ap);
|
| - logging::LogMessage(file, line).stream() << result;
|
| - va_end(ap);
|
| - }
|
| -
|
| - private:
|
| - bool scheduler_active_;
|
| -};
|
| -
|
| -// We need to write our own protobuf to string functions because we
|
| -// use LITE_RUNTIME, which doesn't support DebugString().
|
| -
|
| -std::string ObjectIdToString(
|
| - const invalidation::ObjectId& object_id) {
|
| - std::stringstream ss;
|
| - ss << "{ ";
|
| - ss << "name: " << object_id.name().string_value() << ", ";
|
| - ss << "source: " << object_id.source();
|
| - ss << " }";
|
| - return ss.str();
|
| -}
|
| -
|
| -std::string StatusToString(
|
| - const invalidation::Status& status) {
|
| - std::stringstream ss;
|
| - ss << "{ ";
|
| - ss << "code: " << status.code() << ", ";
|
| - ss << "description: " << status.description();
|
| - ss << " }";
|
| - return ss.str();
|
| -}
|
| -
|
| -std::string InvalidationToString(
|
| - const invalidation::Invalidation& invalidation) {
|
| - std::stringstream ss;
|
| - ss << "{ ";
|
| - ss << "object_id: " << ObjectIdToString(invalidation.object_id()) << ", ";
|
| - ss << "version: " << invalidation.version() << ", ";
|
| - ss << "components: { ";
|
| - const invalidation::ComponentStampLog& component_stamp_log =
|
| - invalidation.component_stamp_log();
|
| - for (int i = 0; i < component_stamp_log.stamps_size(); ++i) {
|
| - const invalidation::ComponentStamp& component_stamp =
|
| - component_stamp_log.stamps(i);
|
| - ss << "component: " << component_stamp.component() << ", ";
|
| - ss << "time: " << component_stamp.time() << ", ";
|
| - }
|
| - ss << " }";
|
| - ss << " }";
|
| - return ss.str();
|
| -}
|
| -
|
| -std::string RegistrationUpdateToString(
|
| - const invalidation::RegistrationUpdate& update) {
|
| - std::stringstream ss;
|
| - ss << "{ ";
|
| - ss << "type: " << update.type() << ", ";
|
| - ss << "object_id: " << ObjectIdToString(update.object_id()) << ", ";
|
| - ss << "version: " << update.version() << ", ";
|
| - ss << "sequence_number: " << update.sequence_number();
|
| - ss << " }";
|
| - return ss.str();
|
| -}
|
| -
|
| -std::string RegistrationUpdateResultToString(
|
| - const invalidation::RegistrationUpdateResult& update_result) {
|
| - std::stringstream ss;
|
| - ss << "{ ";
|
| - ss << "operation: "
|
| - << RegistrationUpdateToString(update_result.operation()) << ", ";
|
| - ss << "status: " << StatusToString(update_result.status());
|
| - ss << " }";
|
| - return ss.str();
|
| -}
|
| -
|
| // The actual listener for sync notifications from the cache
|
| // invalidation service.
|
| class ChromeInvalidationListener
|
| @@ -346,8 +203,9 @@ class ChromeInvalidationListener
|
| virtual void Invalidate(const invalidation::Invalidation& invalidation,
|
| invalidation::Closure* callback) {
|
| CHECK(invalidation::IsCallbackRepeatable(callback));
|
| - LOG(INFO) << "Invalidate: " << InvalidationToString(invalidation);
|
| - RunAndDeleteClosure(callback);
|
| + LOG(INFO) << "Invalidate: "
|
| + << sync_notifier::InvalidationToString(invalidation);
|
| + sync_notifier::RunAndDeleteClosure(callback);
|
| // A real implementation would respond to the invalidation for the
|
| // given object (e.g., refetch the invalidated object).
|
| }
|
| @@ -355,7 +213,7 @@ class ChromeInvalidationListener
|
| virtual void InvalidateAll(invalidation::Closure* callback) {
|
| CHECK(invalidation::IsCallbackRepeatable(callback));
|
| LOG(INFO) << "InvalidateAll";
|
| - RunAndDeleteClosure(callback);
|
| + sync_notifier::RunAndDeleteClosure(callback);
|
| // A real implementation would loop over the current registered
|
| // data types and send notifications for those.
|
| }
|
| @@ -363,7 +221,7 @@ class ChromeInvalidationListener
|
| virtual void AllRegistrationsLost(invalidation::Closure* callback) {
|
| CHECK(invalidation::IsCallbackRepeatable(callback));
|
| LOG(INFO) << "AllRegistrationsLost";
|
| - RunAndDeleteClosure(callback);
|
| + sync_notifier::RunAndDeleteClosure(callback);
|
| // A real implementation would try to re-register for all
|
| // registered data types.
|
| }
|
| @@ -372,8 +230,8 @@ class ChromeInvalidationListener
|
| invalidation::Closure* callback) {
|
| CHECK(invalidation::IsCallbackRepeatable(callback));
|
| LOG(INFO) << "RegistrationLost: "
|
| - << ObjectIdToString(object_id);
|
| - RunAndDeleteClosure(callback);
|
| + << sync_notifier::ObjectIdToString(object_id);
|
| + sync_notifier::RunAndDeleteClosure(callback);
|
| // A real implementation would try to re-register for this
|
| // particular data type.
|
| }
|
| @@ -382,199 +240,6 @@ class ChromeInvalidationListener
|
| DISALLOW_COPY_AND_ASSIGN(ChromeInvalidationListener);
|
| };
|
|
|
| -static const buzz::QName kQnTangoIqPacket("google:tango", "packet");
|
| -static const buzz::QName kQnTangoIqPacketContent(
|
| - "google:tango", "content");
|
| -
|
| -// A task that listens for ClientInvalidation messages and calls the
|
| -// given callback on them.
|
| -class CacheInvalidationListenTask : public buzz::XmppTask {
|
| - public:
|
| - // Takes ownership of callback.
|
| - CacheInvalidationListenTask(Task* parent,
|
| - Callback1<const std::string&>::Type* callback)
|
| - : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
|
| - virtual ~CacheInvalidationListenTask() {}
|
| -
|
| - virtual int ProcessStart() {
|
| - LOG(INFO) << "CacheInvalidationListenTask started";
|
| - return STATE_RESPONSE;
|
| - }
|
| -
|
| - virtual int ProcessResponse() {
|
| - const buzz::XmlElement* stanza = NextStanza();
|
| - if (stanza == NULL) {
|
| - LOG(INFO) << "CacheInvalidationListenTask blocked";
|
| - return STATE_BLOCKED;
|
| - }
|
| - LOG(INFO) << "CacheInvalidationListenTask response received";
|
| - std::string data;
|
| - if (GetTangoIqPacketData(stanza, &data)) {
|
| - callback_->Run(data);
|
| - } else {
|
| - LOG(ERROR) << "Could not get packet data";
|
| - }
|
| - // Acknowledge receipt of the iq to the buzz server.
|
| - // TODO(akalin): Send an error response for malformed packets.
|
| - scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
|
| - SendStanza(response_stanza.get());
|
| - return STATE_RESPONSE;
|
| - }
|
| -
|
| - virtual bool HandleStanza(const buzz::XmlElement* stanza) {
|
| - LOG(INFO) << "Stanza received: "
|
| - << notifier::XmlElementToString(*stanza);
|
| - if (IsValidTangoIqPacket(stanza)) {
|
| - LOG(INFO) << "Queueing stanza";
|
| - QueueStanza(stanza);
|
| - return true;
|
| - }
|
| - LOG(INFO) << "Stanza skipped";
|
| - return false;
|
| - }
|
| -
|
| - private:
|
| - bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) {
|
| - return
|
| - (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) &&
|
| - (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str()));
|
| - }
|
| -
|
| - bool GetTangoIqPacketData(const buzz::XmlElement* stanza,
|
| - std::string* data) {
|
| - DCHECK(IsValidTangoIqPacket(stanza));
|
| - const buzz::XmlElement* tango_iq_packet =
|
| - stanza->FirstNamed(kQnTangoIqPacket);
|
| - if (!tango_iq_packet) {
|
| - LOG(ERROR) << "Could not find tango IQ packet element";
|
| - return false;
|
| - }
|
| - const buzz::XmlElement* tango_iq_packet_content =
|
| - tango_iq_packet->FirstNamed(kQnTangoIqPacketContent);
|
| - if (!tango_iq_packet_content) {
|
| - LOG(ERROR) << "Could not find tango IQ packet content element";
|
| - return false;
|
| - }
|
| - *data = tango_iq_packet_content->BodyText();
|
| - return true;
|
| - }
|
| -
|
| - scoped_ptr<Callback1<const std::string&>::Type> callback_;
|
| - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
|
| -};
|
| -
|
| -// A task that sends a single outbound ClientInvalidation message.
|
| -class CacheInvalidationSendMessageTask : public buzz::XmppTask {
|
| - public:
|
| - CacheInvalidationSendMessageTask(Task* parent,
|
| - const buzz::Jid& to_jid,
|
| - const std::string& msg)
|
| - : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
|
| - to_jid_(to_jid), msg_(msg) {}
|
| - virtual ~CacheInvalidationSendMessageTask() {}
|
| -
|
| - virtual int ProcessStart() {
|
| - scoped_ptr<buzz::XmlElement> stanza(
|
| - MakeTangoIqPacket(to_jid_, task_id(), msg_));
|
| - LOG(INFO) << "Sending message: "
|
| - << notifier::XmlElementToString(*stanza.get());
|
| - if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
|
| - LOG(INFO) << "Error when sending message";
|
| - return STATE_ERROR;
|
| - }
|
| - return STATE_RESPONSE;
|
| - }
|
| -
|
| - virtual int ProcessResponse() {
|
| - const buzz::XmlElement* stanza = NextStanza();
|
| - if (stanza == NULL) {
|
| - LOG(INFO) << "CacheInvalidationSendMessageTask blocked...";
|
| - return STATE_BLOCKED;
|
| - }
|
| - LOG(INFO) << "CacheInvalidationSendMessageTask response received: "
|
| - << notifier::XmlElementToString(*stanza);
|
| - return STATE_DONE;
|
| - }
|
| -
|
| - virtual bool HandleStanza(const buzz::XmlElement* stanza) {
|
| - LOG(INFO) << "Stanza received: "
|
| - << notifier::XmlElementToString(*stanza);
|
| - if (!MatchResponseIq(stanza, to_jid_, task_id())) {
|
| - LOG(INFO) << "Stanza skipped";
|
| - return false;
|
| - }
|
| - LOG(INFO) << "Queueing stanza";
|
| - QueueStanza(stanza);
|
| - return true;
|
| - }
|
| -
|
| - private:
|
| - static buzz::XmlElement* MakeTangoIqPacket(
|
| - const buzz::Jid& to_jid,
|
| - const std::string& task_id,
|
| - const std::string& msg) {
|
| - buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
|
| - buzz::XmlElement* tango_iq_packet =
|
| - new buzz::XmlElement(kQnTangoIqPacket, true);
|
| - iq->AddElement(tango_iq_packet);
|
| - buzz::XmlElement* tango_iq_packet_content =
|
| - new buzz::XmlElement(kQnTangoIqPacketContent, true);
|
| - tango_iq_packet->AddElement(tango_iq_packet_content);
|
| - tango_iq_packet_content->SetBodyText(msg);
|
| - return iq;
|
| - }
|
| -
|
| - buzz::Jid to_jid_;
|
| - std::string msg_;
|
| -
|
| - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
|
| -};
|
| -
|
| -// Class that handles the details of sending and receiving client
|
| -// invalidation packets.
|
| -class CacheInvalidationPacketHandler {
|
| - public:
|
| - CacheInvalidationPacketHandler(
|
| - buzz::XmppClient* xmpp_client,
|
| - invalidation::NetworkEndpoint* network_endpoint,
|
| - const buzz::Jid& to_jid)
|
| - : xmpp_client_(xmpp_client), network_endpoint_(network_endpoint),
|
| - to_jid_(to_jid) {
|
| - // Owned by xmpp_client.
|
| - CacheInvalidationListenTask* listen_task =
|
| - new CacheInvalidationListenTask(
|
| - xmpp_client, NewCallback(
|
| - this, &CacheInvalidationPacketHandler::HandleInboundPacket));
|
| - listen_task->Start();
|
| - }
|
| -
|
| - void HandleOutboundPacket(invalidation::NetworkEndpoint* const &
|
| - network_endpoint) {
|
| - CHECK_EQ(network_endpoint, network_endpoint_);
|
| - invalidation::string message;
|
| - network_endpoint->TakeOutboundMessage(&message);
|
| - std::string encoded_message;
|
| - CHECK(base::Base64Encode(message, &encoded_message));
|
| - // Owned by xmpp_client.
|
| - CacheInvalidationSendMessageTask* send_message_task =
|
| - new CacheInvalidationSendMessageTask(xmpp_client_,
|
| - to_jid_,
|
| - encoded_message);
|
| - send_message_task->Start();
|
| - }
|
| -
|
| - private:
|
| - void HandleInboundPacket(const std::string& packet) {
|
| - std::string decoded_message;
|
| - CHECK(base::Base64Decode(packet, &decoded_message));
|
| - network_endpoint_->HandleInboundMessage(decoded_message);
|
| - }
|
| -
|
| - buzz::XmppClient* xmpp_client_;
|
| - invalidation::NetworkEndpoint* network_endpoint_;
|
| - buzz::Jid to_jid_;
|
| -};
|
| -
|
| // Delegate for server-side notifications.
|
| class CacheInvalidationNotifierDelegate
|
| : public XmppNotificationClient::Delegate {
|
| @@ -601,36 +266,15 @@ class CacheInvalidationNotifierDelegate
|
| buzz::XmppClient* xmpp_client) {
|
| LOG(INFO) << "Logged in";
|
|
|
| - chrome_system_resources_.StartScheduler();
|
| -
|
| - invalidation::ClientType client_type;
|
| - client_type.set_type(invalidation::ClientType::CHROME_SYNC);
|
| // TODO(akalin): app_name should be per-client unique.
|
| - std::string app_name = "cc_sync_listen_notifications";
|
| - invalidation::ClientConfig ticl_config;
|
| - invalidation_client_.reset(
|
| - new invalidation::InvalidationClientImpl(
|
| - &chrome_system_resources_, client_type,
|
| - app_name, &chrome_invalidation_listener_, ticl_config));
|
| -
|
| - invalidation::NetworkEndpoint* network_endpoint =
|
| - invalidation_client_->network_endpoint();
|
| - // TODO(akalin): Make tango jid configurable (so we can use
|
| - // staging).
|
| - buzz::Jid to_jid("tango@bot.talk.google.com/PROD");
|
| - packet_handler_.reset(
|
| - new CacheInvalidationPacketHandler(xmpp_client,
|
| - network_endpoint,
|
| - to_jid));
|
| -
|
| - network_endpoint->RegisterOutboundListener(
|
| - invalidation::NewPermanentCallback(
|
| - packet_handler_.get(),
|
| - &CacheInvalidationPacketHandler::HandleOutboundPacket));
|
| + const std::string kAppName = "cc_sync_listen_notifications";
|
| + chrome_invalidation_client_.Start(kAppName,
|
| + &chrome_invalidation_listener_,
|
| + xmpp_client);
|
|
|
| for (std::vector<invalidation::ObjectId>::const_iterator it =
|
| object_ids_.begin(); it != object_ids_.end(); ++it) {
|
| - invalidation_client_->Register(
|
| + chrome_invalidation_client_.Register(
|
| *it,
|
| invalidation::NewPermanentCallback(
|
| this, &CacheInvalidationNotifierDelegate::RegisterCallback));
|
| @@ -643,43 +287,39 @@ class CacheInvalidationNotifierDelegate
|
| // TODO(akalin): Figure out the correct place to put this.
|
| for (std::vector<invalidation::ObjectId>::const_iterator it =
|
| object_ids_.begin(); it != object_ids_.end(); ++it) {
|
| - invalidation_client_->Unregister(
|
| + chrome_invalidation_client_.Unregister(
|
| *it,
|
| invalidation::NewPermanentCallback(
|
| this, &CacheInvalidationNotifierDelegate::RegisterCallback));
|
| }
|
|
|
| - packet_handler_.reset();
|
| - invalidation_client_.reset();
|
| -
|
| - chrome_system_resources_.StopScheduler();
|
| + chrome_invalidation_client_.Stop();
|
| }
|
|
|
| virtual void OnError(buzz::XmppEngine::Error error, int subcode) {
|
| LOG(INFO) << "Error: " << error << ", subcode: " << subcode;
|
| - packet_handler_.reset();
|
| - invalidation_client_.reset();
|
|
|
| - // TODO(akalin): Figure out whether we should stop the scheduler
|
| - // here.
|
| + // TODO(akalin): Figure out whether we should unregister here,
|
| + // too.
|
| + chrome_invalidation_client_.Stop();
|
| }
|
|
|
| private:
|
| void RegisterCallback(
|
| const invalidation::RegistrationUpdateResult& result) {
|
| - LOG(INFO) << "Registered: " << RegistrationUpdateResultToString(result);
|
| + LOG(INFO) << "Registered: "
|
| + << sync_notifier::RegistrationUpdateResultToString(result);
|
| }
|
|
|
| void UnregisterCallback(
|
| const invalidation::RegistrationUpdateResult& result) {
|
| - LOG(INFO) << "Unregistered: " << RegistrationUpdateResultToString(result);
|
| + LOG(INFO) << "Unregistered: "
|
| + << sync_notifier::RegistrationUpdateResultToString(result);
|
| }
|
|
|
| std::vector<invalidation::ObjectId> object_ids_;
|
| - ChromeSystemResources chrome_system_resources_;
|
| ChromeInvalidationListener chrome_invalidation_listener_;
|
| - scoped_ptr<invalidation::InvalidationClient> invalidation_client_;
|
| - scoped_ptr<CacheInvalidationPacketHandler> packet_handler_;
|
| + sync_notifier::ChromeInvalidationClient chrome_invalidation_client_;
|
| };
|
|
|
| } // namespace
|
|
|