Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(237)

Side by Side Diff: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc

Issue 9190029: use push messaging in cache invalidation xmpp channel (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" 5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6 6
7 #include <string> 7 #include <string>
8 8
9 #include "base/bind.h"
10 #include "base/base64.h" 9 #include "base/base64.h"
11 #include "base/callback.h" 10 #include "base/callback.h"
12 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
13 #include "base/logging.h" 12 #include "base/logging.h"
14 #include "base/rand_util.h" 13 #include "base/rand_util.h"
15 #include "base/string_number_conversions.h" 14 #include "base/string_number_conversions.h"
15 #include "google/cacheinvalidation/v2/client_gateway.pb.h"
16 #include "google/cacheinvalidation/v2/constants.h" 16 #include "google/cacheinvalidation/v2/constants.h"
17 #include "google/cacheinvalidation/v2/invalidation-client.h" 17 #include "google/cacheinvalidation/v2/invalidation-client.h"
18 #include "google/cacheinvalidation/v2/system-resources.h" 18 #include "google/cacheinvalidation/v2/system-resources.h"
19 #include "jingle/notifier/listener/notification_constants.h"
20 #include "jingle/notifier/listener/push_notifications_listen_task.h"
21 #include "jingle/notifier/listener/push_notifications_send_update_task.h"
22 #include "jingle/notifier/listener/push_notifications_subscribe_task.h"
19 #include "jingle/notifier/listener/xml_element_util.h" 23 #include "jingle/notifier/listener/xml_element_util.h"
20 #include "talk/xmpp/constants.h" 24 #include "talk/xmpp/constants.h"
21 #include "talk/xmpp/jid.h" 25 #include "talk/xmpp/jid.h"
22 #include "talk/xmpp/xmppclient.h" 26 #include "talk/xmpp/xmppclient.h"
23 #include "talk/xmpp/xmpptask.h" 27 #include "talk/xmpp/xmpptask.h"
24 28
25 namespace sync_notifier { 29 namespace sync_notifier {
26 30
27 namespace { 31 namespace {
28 32
29 const char kBotJid[] = "tango@bot.talk.google.com"; 33 const char kBotJid[] = "tango@bot.talk.google.com";
30 const char kServiceUrl[] = "http://www.google.com/chrome/sync"; 34 const char kChannelName[] = "tango_raw";
31
32 buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); }
33 buzz::QName GetQnSeq() { return buzz::QName("", "seq"); }
34 buzz::QName GetQnSid() { return buzz::QName("", "sid"); }
35 buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); }
36 buzz::QName GetQnProtocolVersion() {
37 return buzz::QName("", "protocolVersion");
38 }
39 buzz::QName GetQnChannelContext() {
40 return buzz::QName("", "channelContext");
41 }
42
43 // TODO(akalin): Move these task classes out so that they can be
44 // unit-tested. This'll probably be done easier once we consolidate
45 // all the packet sending/receiving classes.
46
47 // A task that listens for ClientInvalidation messages and calls the
48 // given callback on them.
49 class CacheInvalidationListenTask : public buzz::XmppTask {
50 public:
51 // Takes ownership of callback.
52 CacheInvalidationListenTask(
53 buzz::XmppTaskParentInterface* parent,
54 const base::Callback<void(const std::string&)>& callback,
55 const base::Callback<void(const std::string&)>& context_change_callback)
56 : XmppTask(parent, buzz::XmppEngine::HL_TYPE),
57 callback_(callback),
58 context_change_callback_(context_change_callback) {}
59 virtual ~CacheInvalidationListenTask() {}
60
61 virtual int ProcessStart() {
62 DVLOG(2) << "CacheInvalidationListenTask started";
63 return STATE_RESPONSE;
64 }
65
66 virtual int ProcessResponse() {
67 const buzz::XmlElement* stanza = NextStanza();
68 if (stanza == NULL) {
69 DVLOG(2) << "CacheInvalidationListenTask blocked";
70 return STATE_BLOCKED;
71 }
72 DVLOG(2) << "CacheInvalidationListenTask response received";
73 std::string data;
74 if (GetCacheInvalidationIqPacketData(stanza, &data)) {
75 callback_.Run(data);
76 } else {
77 LOG(ERROR) << "Could not get packet data";
78 }
79 // Acknowledge receipt of the iq to the buzz server.
80 // TODO(akalin): Send an error response for malformed packets.
81 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
82 SendStanza(response_stanza.get());
83 return STATE_RESPONSE;
84 }
85
86 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
87 DVLOG(1) << "Stanza received: "
88 << notifier::XmlElementToString(*stanza);
89 if (IsValidCacheInvalidationIqPacket(stanza)) {
90 DVLOG(2) << "Queueing stanza";
91 QueueStanza(stanza);
92 return true;
93 }
94 DVLOG(2) << "Stanza skipped";
95 return false;
96 }
97
98 private:
99 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
100 // We deliberately minimize the verification we do here: see
101 // http://crbug.com/71285 .
102 return MatchRequestIq(stanza, buzz::STR_SET, GetQnData());
103 }
104
105 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
106 std::string* data) {
107 DCHECK(IsValidCacheInvalidationIqPacket(stanza));
108 const buzz::XmlElement* cache_invalidation_iq_packet =
109 stanza->FirstNamed(GetQnData());
110 if (!cache_invalidation_iq_packet) {
111 LOG(ERROR) << "Could not find cache invalidation IQ packet element";
112 return false;
113 }
114 // Look for a channelContext attribute in the content of the stanza. If
115 // present, remember it so it can be echoed back.
116 if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) {
117 context_change_callback_.Run(
118 cache_invalidation_iq_packet->Attr(GetQnChannelContext()));
119 }
120 *data = cache_invalidation_iq_packet->BodyText();
121 return true;
122 }
123
124 std::string* channel_context_;
125 base::Callback<void(const std::string&)> callback_;
126 base::Callback<void(const std::string&)> context_change_callback_;
127 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
128 };
129
130 std::string MakeProtocolVersion() {
131 return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) +
132 "." +
133 base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion);
134 }
135
136 // A task that sends a single outbound ClientInvalidation message.
137 class CacheInvalidationSendMessageTask : public buzz::XmppTask {
138 public:
139 CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent,
140 const buzz::Jid& to_jid,
141 const std::string& msg,
142 int seq,
143 const std::string& sid,
144 const std::string& channel_context)
145 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
146 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid),
147 channel_context_(channel_context) {}
148 virtual ~CacheInvalidationSendMessageTask() {}
149
150 virtual int ProcessStart() {
151 scoped_ptr<buzz::XmlElement> stanza(
152 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
153 seq_, sid_, channel_context_));
154 DVLOG(1) << "Sending message: "
155 << notifier::XmlElementToString(*stanza.get());
156 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
157 DVLOG(2) << "Error when sending message";
158 return STATE_ERROR;
159 }
160 return STATE_RESPONSE;
161 }
162
163 virtual int ProcessResponse() {
164 const buzz::XmlElement* stanza = NextStanza();
165 if (stanza == NULL) {
166 DVLOG(2) << "CacheInvalidationSendMessageTask blocked...";
167 return STATE_BLOCKED;
168 }
169 DVLOG(2) << "CacheInvalidationSendMessageTask response received: "
170 << notifier::XmlElementToString(*stanza);
171 // TODO(akalin): Handle errors here.
172 return STATE_DONE;
173 }
174
175 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
176 DVLOG(1) << "Stanza received: "
177 << notifier::XmlElementToString(*stanza);
178 if (!MatchResponseIq(stanza, to_jid_, task_id())) {
179 DVLOG(2) << "Stanza skipped";
180 return false;
181 }
182 DVLOG(2) << "Queueing stanza";
183 QueueStanza(stanza);
184 return true;
185 }
186
187 private:
188 static buzz::XmlElement* MakeCacheInvalidationIqPacket(
189 const buzz::Jid& to_jid,
190 const std::string& task_id,
191 const std::string& msg,
192 int seq, const std::string& sid, const std::string& channel_context) {
193 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
194 buzz::XmlElement* cache_invalidation_iq_packet =
195 new buzz::XmlElement(GetQnData(), true);
196 iq->AddElement(cache_invalidation_iq_packet);
197 cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq));
198 cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid);
199 cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl);
200 cache_invalidation_iq_packet->SetAttr(
201 GetQnProtocolVersion(), MakeProtocolVersion());
202 if (!channel_context.empty()) {
203 cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(),
204 channel_context);
205 }
206 cache_invalidation_iq_packet->SetBodyText(msg);
207 return iq;
208 }
209
210 const buzz::Jid to_jid_;
211 std::string msg_;
212 int seq_;
213 std::string sid_;
214 const std::string channel_context_;
215
216 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
217 };
218
219 std::string MakeSid() {
220 uint64 sid = base::RandUint64();
221 return std::string("chrome-sync-") + base::Uint64ToString(sid);
222 }
223 35
224 } // namespace 36 } // namespace
225 37
226 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( 38 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
227 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) 39 base::WeakPtr<buzz::XmppTaskParentInterface> base_task)
228 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 40 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
229 base_task_(base_task), 41 base_task_(base_task),
230 seq_(0), 42 seq_(0),
231 sid_(MakeSid()) { 43 scheduling_hash_(0) {
232 CHECK(base_task_.get()); 44 CHECK(base_task_.get());
233 // Owned by base_task. Takes ownership of the callback. 45 // Owned by base_task. Takes ownership of the callback.
234 CacheInvalidationListenTask* listen_task = 46 notifier::PushNotificationsListenTask* listen_task =
235 new CacheInvalidationListenTask( 47 new notifier::PushNotificationsListenTask(base_task_, this);
236 base_task_, base::Bind(
237 &CacheInvalidationPacketHandler::HandleInboundPacket,
238 weak_factory_.GetWeakPtr()),
239 base::Bind(
240 &CacheInvalidationPacketHandler::HandleChannelContextChange,
241 weak_factory_.GetWeakPtr()));
242 listen_task->Start(); 48 listen_task->Start();
243 } 49 }
244 50
245 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { 51 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
246 DCHECK(non_thread_safe_.CalledOnValidThread()); 52 DCHECK(non_thread_safe_.CalledOnValidThread());
247 } 53 }
248 54
249 void CacheInvalidationPacketHandler::SendMessage( 55 void CacheInvalidationPacketHandler::SendMessage(
250 const std::string& message) { 56 const std::string& message) {
251 DCHECK(non_thread_safe_.CalledOnValidThread()); 57 DCHECK(non_thread_safe_.CalledOnValidThread());
252 if (!base_task_.get()) { 58 if (!base_task_.get()) {
253 return; 59 return;
254 } 60 }
255 std::string encoded_message; 61 ipc::invalidation::ClientGatewayMessage envelope;
256 if (!base::Base64Encode(message, &encoded_message)) { 62 envelope.set_is_client_to_server(true);
257 LOG(ERROR) << "Could not base64-encode message to send: " 63 if (!service_context_.empty()) {
258 << message; 64 envelope.set_service_context(service_context_);
259 return; 65 envelope.set_rpc_scheduling_hash(scheduling_hash_);
260 } 66 }
67 envelope.set_network_message(message);
68
69 notifier::Recipient recipient;
70 recipient.to = kBotJid;
71 notifier::Notification notification;
72 notification.channel = kChannelName;
73 notification.recipients.push_back(recipient);
74 envelope.SerializeToString(&notification.data);
75
261 // Owned by base_task_. 76 // Owned by base_task_.
262 CacheInvalidationSendMessageTask* send_message_task = 77 notifier::PushNotificationsSendUpdateTask* send_message_task =
263 new CacheInvalidationSendMessageTask(base_task_, 78 new notifier::PushNotificationsSendUpdateTask(base_task_, notification);
264 buzz::Jid(kBotJid),
265 encoded_message,
266 seq_, sid_, channel_context_);
267 send_message_task->Start(); 79 send_message_task->Start();
268 ++seq_;
269 } 80 }
270 81
271 void CacheInvalidationPacketHandler::SetMessageReceiver( 82 void CacheInvalidationPacketHandler::SetMessageReceiver(
272 invalidation::MessageCallback* incoming_receiver) { 83 invalidation::MessageCallback* incoming_receiver) {
273 incoming_receiver_.reset(incoming_receiver); 84 incoming_receiver_.reset(incoming_receiver);
274 } 85 }
275 86
276 void CacheInvalidationPacketHandler::HandleInboundPacket( 87 void CacheInvalidationPacketHandler::SendSubscriptionRequest() {
277 const std::string& packet) { 88 // Owned by base_task_.
akalin 2012/01/12 23:06:44 this comment should be right above push_subscripti
ghc 2012/01/13 01:23:07 Done.
278 DCHECK(non_thread_safe_.CalledOnValidThread()); 89 notifier::Subscription subscription;
279 std::string decoded_message; 90 subscription.channel = kChannelName;
280 if (!base::Base64Decode(packet, &decoded_message)) { 91 subscription.from = "";
281 LOG(ERROR) << "Could not base64-decode received message: " 92 notifier::SubscriptionList subscription_list;
282 << packet; 93 subscription_list.push_back(subscription);
283 return; 94 notifier::PushNotificationsSubscribeTask* push_subscription_task =
284 } 95 new notifier::PushNotificationsSubscribeTask(
285 incoming_receiver_->Run(decoded_message); 96 base_task_, subscription_list, this);
97 push_subscription_task->Start();
286 } 98 }
287 99
288 void CacheInvalidationPacketHandler::HandleChannelContextChange( 100 void CacheInvalidationPacketHandler::OnSubscribed() {
289 const std::string& context) { 101 // TODO(ghc): Consider whether we should do more here.
102 }
103
104 void CacheInvalidationPacketHandler::OnSubscriptionError() {
105 // TODO(ghc): Consider whether we should do more here.
106 }
107
108 void CacheInvalidationPacketHandler::OnNotificationReceived(
109 const notifier::Notification& notification) {
290 DCHECK(non_thread_safe_.CalledOnValidThread()); 110 DCHECK(non_thread_safe_.CalledOnValidThread());
291 channel_context_ = context; 111 const std::string& decoded_message = notification.data;
112 ipc::invalidation::ClientGatewayMessage envelope;
113 envelope.ParseFromString(decoded_message);
114 if (!envelope.IsInitialized()) {
115 LOG(ERROR) << "Could not parse ClientGatewayMessage: "
116 << decoded_message;
akalin 2012/01/12 23:06:44 just to make sure, decoded_message is base64-encod
ghc 2012/01/13 01:23:07 No -- the data on the wire is base64-encoded, but
117 }
118 if (envelope.has_service_context()) {
119 service_context_ = envelope.service_context();
120 }
121 if (envelope.has_rpc_scheduling_hash()) {
122 scheduling_hash_ = envelope.rpc_scheduling_hash();
123 }
124 incoming_receiver_->Run(envelope.network_message());
292 } 125 }
293 126
294 } // namespace sync_notifier 127 } // namespace sync_notifier
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698