Chromium Code Reviews| Index: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc |
| =================================================================== |
| --- chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (revision 117278) |
| +++ chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (working copy) |
| @@ -6,16 +6,20 @@ |
| #include <string> |
| -#include "base/bind.h" |
| #include "base/base64.h" |
| #include "base/callback.h" |
| #include "base/compiler_specific.h" |
| #include "base/logging.h" |
| #include "base/rand_util.h" |
| #include "base/string_number_conversions.h" |
| +#include "google/cacheinvalidation/v2/client_gateway.pb.h" |
| #include "google/cacheinvalidation/v2/constants.h" |
| #include "google/cacheinvalidation/v2/invalidation-client.h" |
| #include "google/cacheinvalidation/v2/system-resources.h" |
| +#include "jingle/notifier/listener/notification_constants.h" |
| +#include "jingle/notifier/listener/push_notifications_listen_task.h" |
| +#include "jingle/notifier/listener/push_notifications_send_update_task.h" |
| +#include "jingle/notifier/listener/push_notifications_subscribe_task.h" |
| #include "jingle/notifier/listener/xml_element_util.h" |
| #include "talk/xmpp/constants.h" |
| #include "talk/xmpp/jid.h" |
| @@ -27,200 +31,8 @@ |
| namespace { |
| const char kBotJid[] = "tango@bot.talk.google.com"; |
| -const char kServiceUrl[] = "http://www.google.com/chrome/sync"; |
| +const char kChannelName[] = "tango_raw"; |
| -buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); } |
| -buzz::QName GetQnSeq() { return buzz::QName("", "seq"); } |
| -buzz::QName GetQnSid() { return buzz::QName("", "sid"); } |
| -buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); } |
| -buzz::QName GetQnProtocolVersion() { |
| - return buzz::QName("", "protocolVersion"); |
| -} |
| -buzz::QName GetQnChannelContext() { |
| - return buzz::QName("", "channelContext"); |
| -} |
| - |
| -// TODO(akalin): Move these task classes out so that they can be |
| -// unit-tested. This'll probably be done easier once we consolidate |
| -// all the packet sending/receiving classes. |
| - |
| -// A task that listens for ClientInvalidation messages and calls the |
| -// given callback on them. |
| -class CacheInvalidationListenTask : public buzz::XmppTask { |
| - public: |
| - // Takes ownership of callback. |
| - CacheInvalidationListenTask( |
| - buzz::XmppTaskParentInterface* parent, |
| - const base::Callback<void(const std::string&)>& callback, |
| - const base::Callback<void(const std::string&)>& context_change_callback) |
| - : XmppTask(parent, buzz::XmppEngine::HL_TYPE), |
| - callback_(callback), |
| - context_change_callback_(context_change_callback) {} |
| - virtual ~CacheInvalidationListenTask() {} |
| - |
| - virtual int ProcessStart() { |
| - DVLOG(2) << "CacheInvalidationListenTask started"; |
| - return STATE_RESPONSE; |
| - } |
| - |
| - virtual int ProcessResponse() { |
| - const buzz::XmlElement* stanza = NextStanza(); |
| - if (stanza == NULL) { |
| - DVLOG(2) << "CacheInvalidationListenTask blocked"; |
| - return STATE_BLOCKED; |
| - } |
| - DVLOG(2) << "CacheInvalidationListenTask response received"; |
| - std::string data; |
| - if (GetCacheInvalidationIqPacketData(stanza, &data)) { |
| - callback_.Run(data); |
| - } else { |
| - LOG(ERROR) << "Could not get packet data"; |
| - } |
| - // Acknowledge receipt of the iq to the buzz server. |
| - // TODO(akalin): Send an error response for malformed packets. |
| - scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); |
| - SendStanza(response_stanza.get()); |
| - return STATE_RESPONSE; |
| - } |
| - |
| - virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
| - DVLOG(1) << "Stanza received: " |
| - << notifier::XmlElementToString(*stanza); |
| - if (IsValidCacheInvalidationIqPacket(stanza)) { |
| - DVLOG(2) << "Queueing stanza"; |
| - QueueStanza(stanza); |
| - return true; |
| - } |
| - DVLOG(2) << "Stanza skipped"; |
| - return false; |
| - } |
| - |
| - private: |
| - bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) { |
| - // We deliberately minimize the verification we do here: see |
| - // http://crbug.com/71285 . |
| - return MatchRequestIq(stanza, buzz::STR_SET, GetQnData()); |
| - } |
| - |
| - bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza, |
| - std::string* data) { |
| - DCHECK(IsValidCacheInvalidationIqPacket(stanza)); |
| - const buzz::XmlElement* cache_invalidation_iq_packet = |
| - stanza->FirstNamed(GetQnData()); |
| - if (!cache_invalidation_iq_packet) { |
| - LOG(ERROR) << "Could not find cache invalidation IQ packet element"; |
| - return false; |
| - } |
| - // Look for a channelContext attribute in the content of the stanza. If |
| - // present, remember it so it can be echoed back. |
| - if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) { |
| - context_change_callback_.Run( |
| - cache_invalidation_iq_packet->Attr(GetQnChannelContext())); |
| - } |
| - *data = cache_invalidation_iq_packet->BodyText(); |
| - return true; |
| - } |
| - |
| - std::string* channel_context_; |
| - base::Callback<void(const std::string&)> callback_; |
| - base::Callback<void(const std::string&)> context_change_callback_; |
| - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); |
| -}; |
| - |
| -std::string MakeProtocolVersion() { |
| - return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) + |
| - "." + |
| - base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion); |
| -} |
| - |
| -// A task that sends a single outbound ClientInvalidation message. |
| -class CacheInvalidationSendMessageTask : public buzz::XmppTask { |
| - public: |
| - CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent, |
| - const buzz::Jid& to_jid, |
| - const std::string& msg, |
| - int seq, |
| - const std::string& sid, |
| - const std::string& channel_context) |
| - : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), |
| - to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid), |
| - channel_context_(channel_context) {} |
| - virtual ~CacheInvalidationSendMessageTask() {} |
| - |
| - virtual int ProcessStart() { |
| - scoped_ptr<buzz::XmlElement> stanza( |
| - MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_, |
| - seq_, sid_, channel_context_)); |
| - DVLOG(1) << "Sending message: " |
| - << notifier::XmlElementToString(*stanza.get()); |
| - if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { |
| - DVLOG(2) << "Error when sending message"; |
| - return STATE_ERROR; |
| - } |
| - return STATE_RESPONSE; |
| - } |
| - |
| - virtual int ProcessResponse() { |
| - const buzz::XmlElement* stanza = NextStanza(); |
| - if (stanza == NULL) { |
| - DVLOG(2) << "CacheInvalidationSendMessageTask blocked..."; |
| - return STATE_BLOCKED; |
| - } |
| - DVLOG(2) << "CacheInvalidationSendMessageTask response received: " |
| - << notifier::XmlElementToString(*stanza); |
| - // TODO(akalin): Handle errors here. |
| - return STATE_DONE; |
| - } |
| - |
| - virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
| - DVLOG(1) << "Stanza received: " |
| - << notifier::XmlElementToString(*stanza); |
| - if (!MatchResponseIq(stanza, to_jid_, task_id())) { |
| - DVLOG(2) << "Stanza skipped"; |
| - return false; |
| - } |
| - DVLOG(2) << "Queueing stanza"; |
| - QueueStanza(stanza); |
| - return true; |
| - } |
| - |
| - private: |
| - static buzz::XmlElement* MakeCacheInvalidationIqPacket( |
| - const buzz::Jid& to_jid, |
| - const std::string& task_id, |
| - const std::string& msg, |
| - int seq, const std::string& sid, const std::string& channel_context) { |
| - buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); |
| - buzz::XmlElement* cache_invalidation_iq_packet = |
| - new buzz::XmlElement(GetQnData(), true); |
| - iq->AddElement(cache_invalidation_iq_packet); |
| - cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq)); |
| - cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid); |
| - cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl); |
| - cache_invalidation_iq_packet->SetAttr( |
| - GetQnProtocolVersion(), MakeProtocolVersion()); |
| - if (!channel_context.empty()) { |
| - cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(), |
| - channel_context); |
| - } |
| - cache_invalidation_iq_packet->SetBodyText(msg); |
| - return iq; |
| - } |
| - |
| - const buzz::Jid to_jid_; |
| - std::string msg_; |
| - int seq_; |
| - std::string sid_; |
| - const std::string channel_context_; |
| - |
| - DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); |
| -}; |
| - |
| -std::string MakeSid() { |
| - uint64 sid = base::RandUint64(); |
| - return std::string("chrome-sync-") + base::Uint64ToString(sid); |
| -} |
| - |
| } // namespace |
| CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( |
| @@ -228,17 +40,11 @@ |
| : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| base_task_(base_task), |
| seq_(0), |
| - sid_(MakeSid()) { |
| + scheduling_hash_(0) { |
| CHECK(base_task_.get()); |
| // Owned by base_task. Takes ownership of the callback. |
| - CacheInvalidationListenTask* listen_task = |
| - new CacheInvalidationListenTask( |
| - base_task_, base::Bind( |
| - &CacheInvalidationPacketHandler::HandleInboundPacket, |
| - weak_factory_.GetWeakPtr()), |
| - base::Bind( |
| - &CacheInvalidationPacketHandler::HandleChannelContextChange, |
| - weak_factory_.GetWeakPtr())); |
| + notifier::PushNotificationsListenTask* listen_task = |
| + new notifier::PushNotificationsListenTask(base_task_, this); |
| listen_task->Start(); |
| } |
| @@ -252,20 +58,25 @@ |
| if (!base_task_.get()) { |
| return; |
| } |
| - std::string encoded_message; |
| - if (!base::Base64Encode(message, &encoded_message)) { |
| - LOG(ERROR) << "Could not base64-encode message to send: " |
| - << message; |
| - return; |
| + ipc::invalidation::ClientGatewayMessage envelope; |
| + envelope.set_is_client_to_server(true); |
| + if (!service_context_.empty()) { |
| + envelope.set_service_context(service_context_); |
| + envelope.set_rpc_scheduling_hash(scheduling_hash_); |
| } |
| + envelope.set_network_message(message); |
| + |
| + notifier::Recipient recipient; |
| + recipient.to = kBotJid; |
| + notifier::Notification notification; |
| + notification.channel = kChannelName; |
| + notification.recipients.push_back(recipient); |
| + envelope.SerializeToString(¬ification.data); |
| + |
| // Owned by base_task_. |
| - CacheInvalidationSendMessageTask* send_message_task = |
| - new CacheInvalidationSendMessageTask(base_task_, |
| - buzz::Jid(kBotJid), |
| - encoded_message, |
| - seq_, sid_, channel_context_); |
| + notifier::PushNotificationsSendUpdateTask* send_message_task = |
| + new notifier::PushNotificationsSendUpdateTask(base_task_, notification); |
| send_message_task->Start(); |
| - ++seq_; |
| } |
| void CacheInvalidationPacketHandler::SetMessageReceiver( |
| @@ -273,22 +84,44 @@ |
| incoming_receiver_.reset(incoming_receiver); |
| } |
| -void CacheInvalidationPacketHandler::HandleInboundPacket( |
| - const std::string& packet) { |
| - DCHECK(non_thread_safe_.CalledOnValidThread()); |
| - std::string decoded_message; |
| - if (!base::Base64Decode(packet, &decoded_message)) { |
| - LOG(ERROR) << "Could not base64-decode received message: " |
| - << packet; |
| - return; |
| - } |
| - incoming_receiver_->Run(decoded_message); |
| +void CacheInvalidationPacketHandler::SendSubscriptionRequest() { |
| + // Owned by base_task_. |
|
akalin
2012/01/12 23:06:44
this comment should be right above push_subscripti
ghc
2012/01/13 01:23:07
Done.
|
| + notifier::Subscription subscription; |
| + subscription.channel = kChannelName; |
| + subscription.from = ""; |
| + notifier::SubscriptionList subscription_list; |
| + subscription_list.push_back(subscription); |
| + notifier::PushNotificationsSubscribeTask* push_subscription_task = |
| + new notifier::PushNotificationsSubscribeTask( |
| + base_task_, subscription_list, this); |
| + push_subscription_task->Start(); |
| } |
| -void CacheInvalidationPacketHandler::HandleChannelContextChange( |
| - const std::string& context) { |
| +void CacheInvalidationPacketHandler::OnSubscribed() { |
| + // TODO(ghc): Consider whether we should do more here. |
| +} |
| + |
| +void CacheInvalidationPacketHandler::OnSubscriptionError() { |
| + // TODO(ghc): Consider whether we should do more here. |
| +} |
| + |
| +void CacheInvalidationPacketHandler::OnNotificationReceived( |
| + const notifier::Notification& notification) { |
| DCHECK(non_thread_safe_.CalledOnValidThread()); |
| - channel_context_ = context; |
| + const std::string& decoded_message = notification.data; |
| + ipc::invalidation::ClientGatewayMessage envelope; |
| + envelope.ParseFromString(decoded_message); |
| + if (!envelope.IsInitialized()) { |
| + LOG(ERROR) << "Could not parse ClientGatewayMessage: " |
| + << decoded_message; |
|
akalin
2012/01/12 23:06:44
just to make sure, decoded_message is base64-encod
ghc
2012/01/13 01:23:07
No -- the data on the wire is base64-encoded, but
|
| + } |
| + if (envelope.has_service_context()) { |
| + service_context_ = envelope.service_context(); |
| + } |
| + if (envelope.has_rpc_scheduling_hash()) { |
| + scheduling_hash_ = envelope.rpc_scheduling_hash(); |
| + } |
| + incoming_receiver_->Run(envelope.network_message()); |
| } |
| } // namespace sync_notifier |