Index: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc |
=================================================================== |
--- chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (revision 117278) |
+++ chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (working copy) |
@@ -6,16 +6,20 @@ |
#include <string> |
-#include "base/bind.h" |
#include "base/base64.h" |
#include "base/callback.h" |
#include "base/compiler_specific.h" |
#include "base/logging.h" |
#include "base/rand_util.h" |
#include "base/string_number_conversions.h" |
+#include "google/cacheinvalidation/v2/client_gateway.pb.h" |
#include "google/cacheinvalidation/v2/constants.h" |
#include "google/cacheinvalidation/v2/invalidation-client.h" |
#include "google/cacheinvalidation/v2/system-resources.h" |
+#include "jingle/notifier/listener/notification_constants.h" |
+#include "jingle/notifier/listener/push_notifications_listen_task.h" |
+#include "jingle/notifier/listener/push_notifications_send_update_task.h" |
+#include "jingle/notifier/listener/push_notifications_subscribe_task.h" |
#include "jingle/notifier/listener/xml_element_util.h" |
#include "talk/xmpp/constants.h" |
#include "talk/xmpp/jid.h" |
@@ -27,200 +31,8 @@ |
namespace { |
const char kBotJid[] = "tango@bot.talk.google.com"; |
-const char kServiceUrl[] = "http://www.google.com/chrome/sync"; |
+const char kChannelName[] = "tango_raw"; |
-buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); } |
-buzz::QName GetQnSeq() { return buzz::QName("", "seq"); } |
-buzz::QName GetQnSid() { return buzz::QName("", "sid"); } |
-buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); } |
-buzz::QName GetQnProtocolVersion() { |
- return buzz::QName("", "protocolVersion"); |
-} |
-buzz::QName GetQnChannelContext() { |
- return buzz::QName("", "channelContext"); |
-} |
- |
-// TODO(akalin): Move these task classes out so that they can be |
-// unit-tested. This'll probably be done easier once we consolidate |
-// all the packet sending/receiving classes. |
- |
-// A task that listens for ClientInvalidation messages and calls the |
-// given callback on them. |
-class CacheInvalidationListenTask : public buzz::XmppTask { |
- public: |
- // Takes ownership of callback. |
- CacheInvalidationListenTask( |
- buzz::XmppTaskParentInterface* parent, |
- const base::Callback<void(const std::string&)>& callback, |
- const base::Callback<void(const std::string&)>& context_change_callback) |
- : XmppTask(parent, buzz::XmppEngine::HL_TYPE), |
- callback_(callback), |
- context_change_callback_(context_change_callback) {} |
- virtual ~CacheInvalidationListenTask() {} |
- |
- virtual int ProcessStart() { |
- DVLOG(2) << "CacheInvalidationListenTask started"; |
- return STATE_RESPONSE; |
- } |
- |
- virtual int ProcessResponse() { |
- const buzz::XmlElement* stanza = NextStanza(); |
- if (stanza == NULL) { |
- DVLOG(2) << "CacheInvalidationListenTask blocked"; |
- return STATE_BLOCKED; |
- } |
- DVLOG(2) << "CacheInvalidationListenTask response received"; |
- std::string data; |
- if (GetCacheInvalidationIqPacketData(stanza, &data)) { |
- callback_.Run(data); |
- } else { |
- LOG(ERROR) << "Could not get packet data"; |
- } |
- // Acknowledge receipt of the iq to the buzz server. |
- // TODO(akalin): Send an error response for malformed packets. |
- scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); |
- SendStanza(response_stanza.get()); |
- return STATE_RESPONSE; |
- } |
- |
- virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
- DVLOG(1) << "Stanza received: " |
- << notifier::XmlElementToString(*stanza); |
- if (IsValidCacheInvalidationIqPacket(stanza)) { |
- DVLOG(2) << "Queueing stanza"; |
- QueueStanza(stanza); |
- return true; |
- } |
- DVLOG(2) << "Stanza skipped"; |
- return false; |
- } |
- |
- private: |
- bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) { |
- // We deliberately minimize the verification we do here: see |
- // http://crbug.com/71285 . |
- return MatchRequestIq(stanza, buzz::STR_SET, GetQnData()); |
- } |
- |
- bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza, |
- std::string* data) { |
- DCHECK(IsValidCacheInvalidationIqPacket(stanza)); |
- const buzz::XmlElement* cache_invalidation_iq_packet = |
- stanza->FirstNamed(GetQnData()); |
- if (!cache_invalidation_iq_packet) { |
- LOG(ERROR) << "Could not find cache invalidation IQ packet element"; |
- return false; |
- } |
- // Look for a channelContext attribute in the content of the stanza. If |
- // present, remember it so it can be echoed back. |
- if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) { |
- context_change_callback_.Run( |
- cache_invalidation_iq_packet->Attr(GetQnChannelContext())); |
- } |
- *data = cache_invalidation_iq_packet->BodyText(); |
- return true; |
- } |
- |
- std::string* channel_context_; |
- base::Callback<void(const std::string&)> callback_; |
- base::Callback<void(const std::string&)> context_change_callback_; |
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); |
-}; |
- |
-std::string MakeProtocolVersion() { |
- return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) + |
- "." + |
- base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion); |
-} |
- |
-// A task that sends a single outbound ClientInvalidation message. |
-class CacheInvalidationSendMessageTask : public buzz::XmppTask { |
- public: |
- CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent, |
- const buzz::Jid& to_jid, |
- const std::string& msg, |
- int seq, |
- const std::string& sid, |
- const std::string& channel_context) |
- : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), |
- to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid), |
- channel_context_(channel_context) {} |
- virtual ~CacheInvalidationSendMessageTask() {} |
- |
- virtual int ProcessStart() { |
- scoped_ptr<buzz::XmlElement> stanza( |
- MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_, |
- seq_, sid_, channel_context_)); |
- DVLOG(1) << "Sending message: " |
- << notifier::XmlElementToString(*stanza.get()); |
- if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { |
- DVLOG(2) << "Error when sending message"; |
- return STATE_ERROR; |
- } |
- return STATE_RESPONSE; |
- } |
- |
- virtual int ProcessResponse() { |
- const buzz::XmlElement* stanza = NextStanza(); |
- if (stanza == NULL) { |
- DVLOG(2) << "CacheInvalidationSendMessageTask blocked..."; |
- return STATE_BLOCKED; |
- } |
- DVLOG(2) << "CacheInvalidationSendMessageTask response received: " |
- << notifier::XmlElementToString(*stanza); |
- // TODO(akalin): Handle errors here. |
- return STATE_DONE; |
- } |
- |
- virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
- DVLOG(1) << "Stanza received: " |
- << notifier::XmlElementToString(*stanza); |
- if (!MatchResponseIq(stanza, to_jid_, task_id())) { |
- DVLOG(2) << "Stanza skipped"; |
- return false; |
- } |
- DVLOG(2) << "Queueing stanza"; |
- QueueStanza(stanza); |
- return true; |
- } |
- |
- private: |
- static buzz::XmlElement* MakeCacheInvalidationIqPacket( |
- const buzz::Jid& to_jid, |
- const std::string& task_id, |
- const std::string& msg, |
- int seq, const std::string& sid, const std::string& channel_context) { |
- buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); |
- buzz::XmlElement* cache_invalidation_iq_packet = |
- new buzz::XmlElement(GetQnData(), true); |
- iq->AddElement(cache_invalidation_iq_packet); |
- cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq)); |
- cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid); |
- cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl); |
- cache_invalidation_iq_packet->SetAttr( |
- GetQnProtocolVersion(), MakeProtocolVersion()); |
- if (!channel_context.empty()) { |
- cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(), |
- channel_context); |
- } |
- cache_invalidation_iq_packet->SetBodyText(msg); |
- return iq; |
- } |
- |
- const buzz::Jid to_jid_; |
- std::string msg_; |
- int seq_; |
- std::string sid_; |
- const std::string channel_context_; |
- |
- DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); |
-}; |
- |
-std::string MakeSid() { |
- uint64 sid = base::RandUint64(); |
- return std::string("chrome-sync-") + base::Uint64ToString(sid); |
-} |
- |
} // namespace |
CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( |
@@ -228,17 +40,11 @@ |
: weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
base_task_(base_task), |
seq_(0), |
- sid_(MakeSid()) { |
+ scheduling_hash_(0) { |
CHECK(base_task_.get()); |
// Owned by base_task. Takes ownership of the callback. |
- CacheInvalidationListenTask* listen_task = |
- new CacheInvalidationListenTask( |
- base_task_, base::Bind( |
- &CacheInvalidationPacketHandler::HandleInboundPacket, |
- weak_factory_.GetWeakPtr()), |
- base::Bind( |
- &CacheInvalidationPacketHandler::HandleChannelContextChange, |
- weak_factory_.GetWeakPtr())); |
+ notifier::PushNotificationsListenTask* listen_task = |
+ new notifier::PushNotificationsListenTask(base_task_, this); |
listen_task->Start(); |
} |
@@ -252,20 +58,25 @@ |
if (!base_task_.get()) { |
return; |
} |
- std::string encoded_message; |
- if (!base::Base64Encode(message, &encoded_message)) { |
- LOG(ERROR) << "Could not base64-encode message to send: " |
- << message; |
- return; |
+ ipc::invalidation::ClientGatewayMessage envelope; |
+ envelope.set_is_client_to_server(true); |
+ if (!service_context_.empty()) { |
+ envelope.set_service_context(service_context_); |
+ envelope.set_rpc_scheduling_hash(scheduling_hash_); |
} |
+ envelope.set_network_message(message); |
+ |
+ notifier::Recipient recipient; |
+ recipient.to = kBotJid; |
+ notifier::Notification notification; |
+ notification.channel = kChannelName; |
+ notification.recipients.push_back(recipient); |
+ envelope.SerializeToString(¬ification.data); |
+ |
// Owned by base_task_. |
- CacheInvalidationSendMessageTask* send_message_task = |
- new CacheInvalidationSendMessageTask(base_task_, |
- buzz::Jid(kBotJid), |
- encoded_message, |
- seq_, sid_, channel_context_); |
+ notifier::PushNotificationsSendUpdateTask* send_message_task = |
+ new notifier::PushNotificationsSendUpdateTask(base_task_, notification); |
send_message_task->Start(); |
- ++seq_; |
} |
void CacheInvalidationPacketHandler::SetMessageReceiver( |
@@ -273,22 +84,44 @@ |
incoming_receiver_.reset(incoming_receiver); |
} |
-void CacheInvalidationPacketHandler::HandleInboundPacket( |
- const std::string& packet) { |
- DCHECK(non_thread_safe_.CalledOnValidThread()); |
- std::string decoded_message; |
- if (!base::Base64Decode(packet, &decoded_message)) { |
- LOG(ERROR) << "Could not base64-decode received message: " |
- << packet; |
- return; |
- } |
- incoming_receiver_->Run(decoded_message); |
+void CacheInvalidationPacketHandler::SendSubscriptionRequest() { |
+ // Owned by base_task_. |
akalin
2012/01/12 23:06:44
this comment should be right above push_subscripti
ghc
2012/01/13 01:23:07
Done.
|
+ notifier::Subscription subscription; |
+ subscription.channel = kChannelName; |
+ subscription.from = ""; |
+ notifier::SubscriptionList subscription_list; |
+ subscription_list.push_back(subscription); |
+ notifier::PushNotificationsSubscribeTask* push_subscription_task = |
+ new notifier::PushNotificationsSubscribeTask( |
+ base_task_, subscription_list, this); |
+ push_subscription_task->Start(); |
} |
-void CacheInvalidationPacketHandler::HandleChannelContextChange( |
- const std::string& context) { |
+void CacheInvalidationPacketHandler::OnSubscribed() { |
+ // TODO(ghc): Consider whether we should do more here. |
+} |
+ |
+void CacheInvalidationPacketHandler::OnSubscriptionError() { |
+ // TODO(ghc): Consider whether we should do more here. |
+} |
+ |
+void CacheInvalidationPacketHandler::OnNotificationReceived( |
+ const notifier::Notification& notification) { |
DCHECK(non_thread_safe_.CalledOnValidThread()); |
- channel_context_ = context; |
+ const std::string& decoded_message = notification.data; |
+ ipc::invalidation::ClientGatewayMessage envelope; |
+ envelope.ParseFromString(decoded_message); |
+ if (!envelope.IsInitialized()) { |
+ LOG(ERROR) << "Could not parse ClientGatewayMessage: " |
+ << decoded_message; |
akalin
2012/01/12 23:06:44
just to make sure, decoded_message is base64-encod
ghc
2012/01/13 01:23:07
No -- the data on the wire is base64-encoded, but
|
+ } |
+ if (envelope.has_service_context()) { |
+ service_context_ = envelope.service_context(); |
+ } |
+ if (envelope.has_rpc_scheduling_hash()) { |
+ scheduling_hash_ = envelope.rpc_scheduling_hash(); |
+ } |
+ incoming_receiver_->Run(envelope.network_message()); |
} |
} // namespace sync_notifier |