Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2570)

Unified Diff: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc

Issue 9190029: use push messaging in cache invalidation xmpp channel (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc
===================================================================
--- chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (revision 117278)
+++ chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (working copy)
@@ -6,16 +6,20 @@
#include <string>
-#include "base/bind.h"
#include "base/base64.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/rand_util.h"
#include "base/string_number_conversions.h"
+#include "google/cacheinvalidation/v2/client_gateway.pb.h"
#include "google/cacheinvalidation/v2/constants.h"
#include "google/cacheinvalidation/v2/invalidation-client.h"
#include "google/cacheinvalidation/v2/system-resources.h"
+#include "jingle/notifier/listener/notification_constants.h"
+#include "jingle/notifier/listener/push_notifications_listen_task.h"
+#include "jingle/notifier/listener/push_notifications_send_update_task.h"
+#include "jingle/notifier/listener/push_notifications_subscribe_task.h"
#include "jingle/notifier/listener/xml_element_util.h"
#include "talk/xmpp/constants.h"
#include "talk/xmpp/jid.h"
@@ -27,200 +31,8 @@
namespace {
const char kBotJid[] = "tango@bot.talk.google.com";
-const char kServiceUrl[] = "http://www.google.com/chrome/sync";
+const char kChannelName[] = "tango_raw";
-buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); }
-buzz::QName GetQnSeq() { return buzz::QName("", "seq"); }
-buzz::QName GetQnSid() { return buzz::QName("", "sid"); }
-buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); }
-buzz::QName GetQnProtocolVersion() {
- return buzz::QName("", "protocolVersion");
-}
-buzz::QName GetQnChannelContext() {
- return buzz::QName("", "channelContext");
-}
-
-// TODO(akalin): Move these task classes out so that they can be
-// unit-tested. This'll probably be done easier once we consolidate
-// all the packet sending/receiving classes.
-
-// A task that listens for ClientInvalidation messages and calls the
-// given callback on them.
-class CacheInvalidationListenTask : public buzz::XmppTask {
- public:
- // Takes ownership of callback.
- CacheInvalidationListenTask(
- buzz::XmppTaskParentInterface* parent,
- const base::Callback<void(const std::string&)>& callback,
- const base::Callback<void(const std::string&)>& context_change_callback)
- : XmppTask(parent, buzz::XmppEngine::HL_TYPE),
- callback_(callback),
- context_change_callback_(context_change_callback) {}
- virtual ~CacheInvalidationListenTask() {}
-
- virtual int ProcessStart() {
- DVLOG(2) << "CacheInvalidationListenTask started";
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- DVLOG(2) << "CacheInvalidationListenTask blocked";
- return STATE_BLOCKED;
- }
- DVLOG(2) << "CacheInvalidationListenTask response received";
- std::string data;
- if (GetCacheInvalidationIqPacketData(stanza, &data)) {
- callback_.Run(data);
- } else {
- LOG(ERROR) << "Could not get packet data";
- }
- // Acknowledge receipt of the iq to the buzz server.
- // TODO(akalin): Send an error response for malformed packets.
- scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
- SendStanza(response_stanza.get());
- return STATE_RESPONSE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- DVLOG(1) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (IsValidCacheInvalidationIqPacket(stanza)) {
- DVLOG(2) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
- DVLOG(2) << "Stanza skipped";
- return false;
- }
-
- private:
- bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
- // We deliberately minimize the verification we do here: see
- // http://crbug.com/71285 .
- return MatchRequestIq(stanza, buzz::STR_SET, GetQnData());
- }
-
- bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
- std::string* data) {
- DCHECK(IsValidCacheInvalidationIqPacket(stanza));
- const buzz::XmlElement* cache_invalidation_iq_packet =
- stanza->FirstNamed(GetQnData());
- if (!cache_invalidation_iq_packet) {
- LOG(ERROR) << "Could not find cache invalidation IQ packet element";
- return false;
- }
- // Look for a channelContext attribute in the content of the stanza. If
- // present, remember it so it can be echoed back.
- if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) {
- context_change_callback_.Run(
- cache_invalidation_iq_packet->Attr(GetQnChannelContext()));
- }
- *data = cache_invalidation_iq_packet->BodyText();
- return true;
- }
-
- std::string* channel_context_;
- base::Callback<void(const std::string&)> callback_;
- base::Callback<void(const std::string&)> context_change_callback_;
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
-};
-
-std::string MakeProtocolVersion() {
- return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) +
- "." +
- base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion);
-}
-
-// A task that sends a single outbound ClientInvalidation message.
-class CacheInvalidationSendMessageTask : public buzz::XmppTask {
- public:
- CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent,
- const buzz::Jid& to_jid,
- const std::string& msg,
- int seq,
- const std::string& sid,
- const std::string& channel_context)
- : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
- to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid),
- channel_context_(channel_context) {}
- virtual ~CacheInvalidationSendMessageTask() {}
-
- virtual int ProcessStart() {
- scoped_ptr<buzz::XmlElement> stanza(
- MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
- seq_, sid_, channel_context_));
- DVLOG(1) << "Sending message: "
- << notifier::XmlElementToString(*stanza.get());
- if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
- DVLOG(2) << "Error when sending message";
- return STATE_ERROR;
- }
- return STATE_RESPONSE;
- }
-
- virtual int ProcessResponse() {
- const buzz::XmlElement* stanza = NextStanza();
- if (stanza == NULL) {
- DVLOG(2) << "CacheInvalidationSendMessageTask blocked...";
- return STATE_BLOCKED;
- }
- DVLOG(2) << "CacheInvalidationSendMessageTask response received: "
- << notifier::XmlElementToString(*stanza);
- // TODO(akalin): Handle errors here.
- return STATE_DONE;
- }
-
- virtual bool HandleStanza(const buzz::XmlElement* stanza) {
- DVLOG(1) << "Stanza received: "
- << notifier::XmlElementToString(*stanza);
- if (!MatchResponseIq(stanza, to_jid_, task_id())) {
- DVLOG(2) << "Stanza skipped";
- return false;
- }
- DVLOG(2) << "Queueing stanza";
- QueueStanza(stanza);
- return true;
- }
-
- private:
- static buzz::XmlElement* MakeCacheInvalidationIqPacket(
- const buzz::Jid& to_jid,
- const std::string& task_id,
- const std::string& msg,
- int seq, const std::string& sid, const std::string& channel_context) {
- buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
- buzz::XmlElement* cache_invalidation_iq_packet =
- new buzz::XmlElement(GetQnData(), true);
- iq->AddElement(cache_invalidation_iq_packet);
- cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq));
- cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid);
- cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl);
- cache_invalidation_iq_packet->SetAttr(
- GetQnProtocolVersion(), MakeProtocolVersion());
- if (!channel_context.empty()) {
- cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(),
- channel_context);
- }
- cache_invalidation_iq_packet->SetBodyText(msg);
- return iq;
- }
-
- const buzz::Jid to_jid_;
- std::string msg_;
- int seq_;
- std::string sid_;
- const std::string channel_context_;
-
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
-};
-
-std::string MakeSid() {
- uint64 sid = base::RandUint64();
- return std::string("chrome-sync-") + base::Uint64ToString(sid);
-}
-
} // namespace
CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
@@ -228,17 +40,11 @@
: weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
base_task_(base_task),
seq_(0),
- sid_(MakeSid()) {
+ scheduling_hash_(0) {
CHECK(base_task_.get());
// Owned by base_task. Takes ownership of the callback.
- CacheInvalidationListenTask* listen_task =
- new CacheInvalidationListenTask(
- base_task_, base::Bind(
- &CacheInvalidationPacketHandler::HandleInboundPacket,
- weak_factory_.GetWeakPtr()),
- base::Bind(
- &CacheInvalidationPacketHandler::HandleChannelContextChange,
- weak_factory_.GetWeakPtr()));
+ notifier::PushNotificationsListenTask* listen_task =
+ new notifier::PushNotificationsListenTask(base_task_, this);
listen_task->Start();
}
@@ -252,20 +58,25 @@
if (!base_task_.get()) {
return;
}
- std::string encoded_message;
- if (!base::Base64Encode(message, &encoded_message)) {
- LOG(ERROR) << "Could not base64-encode message to send: "
- << message;
- return;
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.set_is_client_to_server(true);
+ if (!service_context_.empty()) {
+ envelope.set_service_context(service_context_);
+ envelope.set_rpc_scheduling_hash(scheduling_hash_);
}
+ envelope.set_network_message(message);
+
+ notifier::Recipient recipient;
+ recipient.to = kBotJid;
+ notifier::Notification notification;
+ notification.channel = kChannelName;
+ notification.recipients.push_back(recipient);
+ envelope.SerializeToString(&notification.data);
+
// Owned by base_task_.
- CacheInvalidationSendMessageTask* send_message_task =
- new CacheInvalidationSendMessageTask(base_task_,
- buzz::Jid(kBotJid),
- encoded_message,
- seq_, sid_, channel_context_);
+ notifier::PushNotificationsSendUpdateTask* send_message_task =
+ new notifier::PushNotificationsSendUpdateTask(base_task_, notification);
send_message_task->Start();
- ++seq_;
}
void CacheInvalidationPacketHandler::SetMessageReceiver(
@@ -273,22 +84,44 @@
incoming_receiver_.reset(incoming_receiver);
}
-void CacheInvalidationPacketHandler::HandleInboundPacket(
- const std::string& packet) {
- DCHECK(non_thread_safe_.CalledOnValidThread());
- std::string decoded_message;
- if (!base::Base64Decode(packet, &decoded_message)) {
- LOG(ERROR) << "Could not base64-decode received message: "
- << packet;
- return;
- }
- incoming_receiver_->Run(decoded_message);
+void CacheInvalidationPacketHandler::SendSubscriptionRequest() {
+ // Owned by base_task_.
akalin 2012/01/12 23:06:44 this comment should be right above push_subscripti
ghc 2012/01/13 01:23:07 Done.
+ notifier::Subscription subscription;
+ subscription.channel = kChannelName;
+ subscription.from = "";
+ notifier::SubscriptionList subscription_list;
+ subscription_list.push_back(subscription);
+ notifier::PushNotificationsSubscribeTask* push_subscription_task =
+ new notifier::PushNotificationsSubscribeTask(
+ base_task_, subscription_list, this);
+ push_subscription_task->Start();
}
-void CacheInvalidationPacketHandler::HandleChannelContextChange(
- const std::string& context) {
+void CacheInvalidationPacketHandler::OnSubscribed() {
+ // TODO(ghc): Consider whether we should do more here.
+}
+
+void CacheInvalidationPacketHandler::OnSubscriptionError() {
+ // TODO(ghc): Consider whether we should do more here.
+}
+
+void CacheInvalidationPacketHandler::OnNotificationReceived(
+ const notifier::Notification& notification) {
DCHECK(non_thread_safe_.CalledOnValidThread());
- channel_context_ = context;
+ const std::string& decoded_message = notification.data;
+ ipc::invalidation::ClientGatewayMessage envelope;
+ envelope.ParseFromString(decoded_message);
+ if (!envelope.IsInitialized()) {
+ LOG(ERROR) << "Could not parse ClientGatewayMessage: "
+ << decoded_message;
akalin 2012/01/12 23:06:44 just to make sure, decoded_message is base64-encod
ghc 2012/01/13 01:23:07 No -- the data on the wire is base64-encoded, but
+ }
+ if (envelope.has_service_context()) {
+ service_context_ = envelope.service_context();
+ }
+ if (envelope.has_rpc_scheduling_hash()) {
+ scheduling_hash_ = envelope.rpc_scheduling_hash();
+ }
+ incoming_receiver_->Run(envelope.network_message());
}
} // namespace sync_notifier

Powered by Google App Engine
This is Rietveld 408576698