Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(157)

Side by Side Diff: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc

Issue 9190029: use push messaging in cache invalidation xmpp channel (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: Move MockWeakXmppClient from global namespace to anonymous namespace Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" 5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6 6
7 #include <string> 7 #include <string>
8 8
9 #include "base/bind.h"
10 #include "base/base64.h" 9 #include "base/base64.h"
11 #include "base/callback.h" 10 #include "base/callback.h"
12 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
13 #include "base/logging.h" 12 #include "base/logging.h"
14 #include "base/rand_util.h" 13 #include "base/rand_util.h"
15 #include "base/string_number_conversions.h" 14 #include "base/string_number_conversions.h"
15 #include "google/cacheinvalidation/v2/client_gateway.pb.h"
16 #include "google/cacheinvalidation/v2/constants.h" 16 #include "google/cacheinvalidation/v2/constants.h"
17 #include "google/cacheinvalidation/v2/invalidation-client.h" 17 #include "google/cacheinvalidation/v2/invalidation-client.h"
18 #include "google/cacheinvalidation/v2/system-resources.h" 18 #include "google/cacheinvalidation/v2/system-resources.h"
19 #include "jingle/notifier/listener/notification_constants.h"
20 #include "jingle/notifier/listener/push_notifications_send_update_task.h"
19 #include "jingle/notifier/listener/xml_element_util.h" 21 #include "jingle/notifier/listener/xml_element_util.h"
20 #include "talk/xmpp/constants.h" 22 #include "talk/xmpp/constants.h"
21 #include "talk/xmpp/jid.h" 23 #include "talk/xmpp/jid.h"
22 #include "talk/xmpp/xmppclient.h" 24 #include "talk/xmpp/xmppclient.h"
23 #include "talk/xmpp/xmpptask.h" 25 #include "talk/xmpp/xmpptask.h"
24 26
25 namespace sync_notifier { 27 namespace sync_notifier {
26 28
27 namespace { 29 namespace {
28 30
29 const char kBotJid[] = "tango@bot.talk.google.com"; 31 const char kBotJid[] = "tango@bot.talk.google.com";
30 const char kServiceUrl[] = "http://www.google.com/chrome/sync"; 32 const char kChannelName[] = "tango_raw";
31
32 buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); }
33 buzz::QName GetQnSeq() { return buzz::QName("", "seq"); }
34 buzz::QName GetQnSid() { return buzz::QName("", "sid"); }
35 buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); }
36 buzz::QName GetQnProtocolVersion() {
37 return buzz::QName("", "protocolVersion");
38 }
39 buzz::QName GetQnChannelContext() {
40 return buzz::QName("", "channelContext");
41 }
42
43 // TODO(akalin): Move these task classes out so that they can be
44 // unit-tested. This'll probably be done easier once we consolidate
45 // all the packet sending/receiving classes.
46
47 // A task that listens for ClientInvalidation messages and calls the
48 // given callback on them.
49 class CacheInvalidationListenTask : public buzz::XmppTask {
50 public:
51 // Takes ownership of callback.
52 CacheInvalidationListenTask(
53 buzz::XmppTaskParentInterface* parent,
54 const base::Callback<void(const std::string&)>& callback,
55 const base::Callback<void(const std::string&)>& context_change_callback)
56 : XmppTask(parent, buzz::XmppEngine::HL_TYPE),
57 callback_(callback),
58 context_change_callback_(context_change_callback) {}
59 virtual ~CacheInvalidationListenTask() {}
60
61 virtual int ProcessStart() {
62 DVLOG(2) << "CacheInvalidationListenTask started";
63 return STATE_RESPONSE;
64 }
65
66 virtual int ProcessResponse() {
67 const buzz::XmlElement* stanza = NextStanza();
68 if (stanza == NULL) {
69 DVLOG(2) << "CacheInvalidationListenTask blocked";
70 return STATE_BLOCKED;
71 }
72 DVLOG(2) << "CacheInvalidationListenTask response received";
73 std::string data;
74 if (GetCacheInvalidationIqPacketData(stanza, &data)) {
75 callback_.Run(data);
76 } else {
77 LOG(ERROR) << "Could not get packet data";
78 }
79 // Acknowledge receipt of the iq to the buzz server.
80 // TODO(akalin): Send an error response for malformed packets.
81 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
82 SendStanza(response_stanza.get());
83 return STATE_RESPONSE;
84 }
85
86 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
87 DVLOG(1) << "Stanza received: "
88 << notifier::XmlElementToString(*stanza);
89 if (IsValidCacheInvalidationIqPacket(stanza)) {
90 DVLOG(2) << "Queueing stanza";
91 QueueStanza(stanza);
92 return true;
93 }
94 DVLOG(2) << "Stanza skipped";
95 return false;
96 }
97
98 private:
99 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
100 // We deliberately minimize the verification we do here: see
101 // http://crbug.com/71285 .
102 return MatchRequestIq(stanza, buzz::STR_SET, GetQnData());
103 }
104
105 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
106 std::string* data) {
107 DCHECK(IsValidCacheInvalidationIqPacket(stanza));
108 const buzz::XmlElement* cache_invalidation_iq_packet =
109 stanza->FirstNamed(GetQnData());
110 if (!cache_invalidation_iq_packet) {
111 LOG(ERROR) << "Could not find cache invalidation IQ packet element";
112 return false;
113 }
114 // Look for a channelContext attribute in the content of the stanza. If
115 // present, remember it so it can be echoed back.
116 if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) {
117 context_change_callback_.Run(
118 cache_invalidation_iq_packet->Attr(GetQnChannelContext()));
119 }
120 *data = cache_invalidation_iq_packet->BodyText();
121 return true;
122 }
123
124 base::Callback<void(const std::string&)> callback_;
125 base::Callback<void(const std::string&)> context_change_callback_;
126 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
127 };
128
129 std::string MakeProtocolVersion() {
130 return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) +
131 "." +
132 base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion);
133 }
134
135 // A task that sends a single outbound ClientInvalidation message.
136 class CacheInvalidationSendMessageTask : public buzz::XmppTask {
137 public:
138 CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent,
139 const buzz::Jid& to_jid,
140 const std::string& msg,
141 int seq,
142 const std::string& sid,
143 const std::string& channel_context)
144 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
145 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid),
146 channel_context_(channel_context) {}
147 virtual ~CacheInvalidationSendMessageTask() {}
148
149 virtual int ProcessStart() {
150 scoped_ptr<buzz::XmlElement> stanza(
151 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
152 seq_, sid_, channel_context_));
153 DVLOG(1) << "Sending message: "
154 << notifier::XmlElementToString(*stanza.get());
155 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
156 DVLOG(2) << "Error when sending message";
157 return STATE_ERROR;
158 }
159 return STATE_RESPONSE;
160 }
161
162 virtual int ProcessResponse() {
163 const buzz::XmlElement* stanza = NextStanza();
164 if (stanza == NULL) {
165 DVLOG(2) << "CacheInvalidationSendMessageTask blocked...";
166 return STATE_BLOCKED;
167 }
168 DVLOG(2) << "CacheInvalidationSendMessageTask response received: "
169 << notifier::XmlElementToString(*stanza);
170 // TODO(akalin): Handle errors here.
171 return STATE_DONE;
172 }
173
174 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
175 DVLOG(1) << "Stanza received: "
176 << notifier::XmlElementToString(*stanza);
177 if (!MatchResponseIq(stanza, to_jid_, task_id())) {
178 DVLOG(2) << "Stanza skipped";
179 return false;
180 }
181 DVLOG(2) << "Queueing stanza";
182 QueueStanza(stanza);
183 return true;
184 }
185
186 private:
187 static buzz::XmlElement* MakeCacheInvalidationIqPacket(
188 const buzz::Jid& to_jid,
189 const std::string& task_id,
190 const std::string& msg,
191 int seq, const std::string& sid, const std::string& channel_context) {
192 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
193 buzz::XmlElement* cache_invalidation_iq_packet =
194 new buzz::XmlElement(GetQnData(), true);
195 iq->AddElement(cache_invalidation_iq_packet);
196 cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq));
197 cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid);
198 cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl);
199 cache_invalidation_iq_packet->SetAttr(
200 GetQnProtocolVersion(), MakeProtocolVersion());
201 if (!channel_context.empty()) {
202 cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(),
203 channel_context);
204 }
205 cache_invalidation_iq_packet->SetBodyText(msg);
206 return iq;
207 }
208
209 const buzz::Jid to_jid_;
210 std::string msg_;
211 int seq_;
212 std::string sid_;
213 const std::string channel_context_;
214
215 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
216 };
217
218 std::string MakeSid() {
219 uint64 sid = base::RandUint64();
220 return std::string("chrome-sync-") + base::Uint64ToString(sid);
221 }
222 33
223 } // namespace 34 } // namespace
224 35
225 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( 36 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
226 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) 37 base::WeakPtr<buzz::XmppTaskParentInterface> base_task)
227 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 38 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
228 base_task_(base_task), 39 base_task_(base_task),
229 seq_(0), 40 seq_(0),
230 sid_(MakeSid()) { 41 scheduling_hash_(0) {
231 CHECK(base_task_.get()); 42 CHECK(base_task_.get());
232 // Owned by base_task. Takes ownership of the callback. 43 // Owned by base_task. Takes ownership of the callback.
233 CacheInvalidationListenTask* listen_task = 44 notifier::PushNotificationsListenTask* listen_task =
234 new CacheInvalidationListenTask( 45 new notifier::PushNotificationsListenTask(base_task_, this);
235 base_task_, base::Bind(
236 &CacheInvalidationPacketHandler::HandleInboundPacket,
237 weak_factory_.GetWeakPtr()),
238 base::Bind(
239 &CacheInvalidationPacketHandler::HandleChannelContextChange,
240 weak_factory_.GetWeakPtr()));
241 listen_task->Start(); 46 listen_task->Start();
242 } 47 }
243 48
244 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { 49 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
245 DCHECK(non_thread_safe_.CalledOnValidThread()); 50 DCHECK(non_thread_safe_.CalledOnValidThread());
246 } 51 }
247 52
248 void CacheInvalidationPacketHandler::SendMessage( 53 void CacheInvalidationPacketHandler::SendMessage(
249 const std::string& message) { 54 const std::string& message) {
250 DCHECK(non_thread_safe_.CalledOnValidThread()); 55 DCHECK(non_thread_safe_.CalledOnValidThread());
251 if (!base_task_.get()) { 56 if (!base_task_.get()) {
252 return; 57 return;
253 } 58 }
254 std::string encoded_message; 59 ipc::invalidation::ClientGatewayMessage envelope;
255 if (!base::Base64Encode(message, &encoded_message)) { 60 envelope.set_is_client_to_server(true);
256 LOG(ERROR) << "Could not base64-encode message to send: " 61 if (!service_context_.empty()) {
257 << message; 62 envelope.set_service_context(service_context_);
258 return; 63 envelope.set_rpc_scheduling_hash(scheduling_hash_);
259 } 64 }
65 envelope.set_network_message(message);
66
67 notifier::Recipient recipient;
68 recipient.to = kBotJid;
69 notifier::Notification notification;
70 notification.channel = kChannelName;
71 notification.recipients.push_back(recipient);
72 envelope.SerializeToString(&notification.data);
73
260 // Owned by base_task_. 74 // Owned by base_task_.
261 CacheInvalidationSendMessageTask* send_message_task = 75 notifier::PushNotificationsSendUpdateTask* send_message_task =
262 new CacheInvalidationSendMessageTask(base_task_, 76 new notifier::PushNotificationsSendUpdateTask(base_task_, notification);
263 buzz::Jid(kBotJid),
264 encoded_message,
265 seq_, sid_, channel_context_);
266 send_message_task->Start(); 77 send_message_task->Start();
267 ++seq_;
268 } 78 }
269 79
270 void CacheInvalidationPacketHandler::SetMessageReceiver( 80 void CacheInvalidationPacketHandler::SetMessageReceiver(
271 invalidation::MessageCallback* incoming_receiver) { 81 invalidation::MessageCallback* incoming_receiver) {
272 incoming_receiver_.reset(incoming_receiver); 82 incoming_receiver_.reset(incoming_receiver);
273 } 83 }
274 84
275 void CacheInvalidationPacketHandler::HandleInboundPacket( 85 void CacheInvalidationPacketHandler::SendSubscriptionRequest() {
276 const std::string& packet) { 86 notifier::Subscription subscription;
87 subscription.channel = kChannelName;
88 subscription.from = "";
89 notifier::SubscriptionList subscription_list;
90 subscription_list.push_back(subscription);
91 // Owned by base_task_.
92 notifier::PushNotificationsSubscribeTask* push_subscription_task =
93 new notifier::PushNotificationsSubscribeTask(
94 base_task_, subscription_list, this);
95 push_subscription_task->Start();
96 }
97
98 void CacheInvalidationPacketHandler::OnSubscribed() {
99 // TODO(ghc): Consider whether we should do more here.
100 }
101
102 void CacheInvalidationPacketHandler::OnSubscriptionError() {
103 // TODO(ghc): Consider whether we should do more here.
104 }
105
106 void CacheInvalidationPacketHandler::OnNotificationReceived(
107 const notifier::Notification& notification) {
277 DCHECK(non_thread_safe_.CalledOnValidThread()); 108 DCHECK(non_thread_safe_.CalledOnValidThread());
278 std::string decoded_message; 109 const std::string& decoded_message = notification.data;
279 if (!base::Base64Decode(packet, &decoded_message)) { 110 ipc::invalidation::ClientGatewayMessage envelope;
280 LOG(ERROR) << "Could not base64-decode received message: " 111 envelope.ParseFromString(decoded_message);
281 << packet; 112 if (!envelope.IsInitialized()) {
113 LOG(ERROR) << "Could not parse ClientGatewayMessage: "
114 << decoded_message;
282 return; 115 return;
283 } 116 }
284 incoming_receiver_->Run(decoded_message); 117 if (envelope.has_service_context()) {
285 } 118 service_context_ = envelope.service_context();
286 119 }
287 void CacheInvalidationPacketHandler::HandleChannelContextChange( 120 if (envelope.has_rpc_scheduling_hash()) {
288 const std::string& context) { 121 scheduling_hash_ = envelope.rpc_scheduling_hash();
289 DCHECK(non_thread_safe_.CalledOnValidThread()); 122 }
290 channel_context_ = context; 123 incoming_receiver_->Run(envelope.network_message());
291 } 124 }
292 125
293 } // namespace sync_notifier 126 } // namespace sync_notifier
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698