Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(218)

Side by Side Diff: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc

Issue 9190029: use push messaging in cache invalidation xmpp channel (Closed) Base URL: http://src.chromium.org/svn/trunk/src/
Patch Set: '' Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" 5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6 6
7 #include <string> 7 #include <string>
8 8
9 #include "base/bind.h"
10 #include "base/base64.h" 9 #include "base/base64.h"
11 #include "base/callback.h" 10 #include "base/callback.h"
12 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
13 #include "base/logging.h" 12 #include "base/logging.h"
14 #include "base/rand_util.h" 13 #include "base/rand_util.h"
15 #include "base/string_number_conversions.h" 14 #include "base/string_number_conversions.h"
15 #include "google/cacheinvalidation/v2/client_gateway.pb.h"
16 #include "google/cacheinvalidation/v2/constants.h" 16 #include "google/cacheinvalidation/v2/constants.h"
17 #include "google/cacheinvalidation/v2/invalidation-client.h" 17 #include "google/cacheinvalidation/v2/invalidation-client.h"
18 #include "google/cacheinvalidation/v2/system-resources.h" 18 #include "google/cacheinvalidation/v2/system-resources.h"
19 #include "jingle/notifier/listener/notification_constants.h"
20 #include "jingle/notifier/listener/push_notifications_listen_task.h"
akalin 2012/01/25 02:45:13 don't need this include since you do so in header
ghc 2012/01/25 03:18:28 Done.
21 #include "jingle/notifier/listener/push_notifications_send_update_task.h"
22 #include "jingle/notifier/listener/push_notifications_subscribe_task.h"
akalin 2012/01/25 02:45:13 this too
ghc 2012/01/25 03:18:28 Done.
19 #include "jingle/notifier/listener/xml_element_util.h" 23 #include "jingle/notifier/listener/xml_element_util.h"
20 #include "talk/xmpp/constants.h" 24 #include "talk/xmpp/constants.h"
21 #include "talk/xmpp/jid.h" 25 #include "talk/xmpp/jid.h"
22 #include "talk/xmpp/xmppclient.h" 26 #include "talk/xmpp/xmppclient.h"
23 #include "talk/xmpp/xmpptask.h" 27 #include "talk/xmpp/xmpptask.h"
24 28
25 namespace sync_notifier { 29 namespace sync_notifier {
26 30
27 namespace { 31 namespace {
28 32
29 const char kBotJid[] = "tango@bot.talk.google.com"; 33 const char kBotJid[] = "tango@bot.talk.google.com";
30 const char kServiceUrl[] = "http://www.google.com/chrome/sync"; 34 const char kChannelName[] = "tango_raw";
31
32 buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); }
33 buzz::QName GetQnSeq() { return buzz::QName("", "seq"); }
34 buzz::QName GetQnSid() { return buzz::QName("", "sid"); }
35 buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); }
36 buzz::QName GetQnProtocolVersion() {
37 return buzz::QName("", "protocolVersion");
38 }
39 buzz::QName GetQnChannelContext() {
40 return buzz::QName("", "channelContext");
41 }
42
43 // TODO(akalin): Move these task classes out so that they can be
44 // unit-tested. This'll probably be done easier once we consolidate
45 // all the packet sending/receiving classes.
46
47 // A task that listens for ClientInvalidation messages and calls the
48 // given callback on them.
49 class CacheInvalidationListenTask : public buzz::XmppTask {
50 public:
51 // Takes ownership of callback.
52 CacheInvalidationListenTask(
53 buzz::XmppTaskParentInterface* parent,
54 const base::Callback<void(const std::string&)>& callback,
55 const base::Callback<void(const std::string&)>& context_change_callback)
56 : XmppTask(parent, buzz::XmppEngine::HL_TYPE),
57 callback_(callback),
58 context_change_callback_(context_change_callback) {}
59 virtual ~CacheInvalidationListenTask() {}
60
61 virtual int ProcessStart() {
62 DVLOG(2) << "CacheInvalidationListenTask started";
63 return STATE_RESPONSE;
64 }
65
66 virtual int ProcessResponse() {
67 const buzz::XmlElement* stanza = NextStanza();
68 if (stanza == NULL) {
69 DVLOG(2) << "CacheInvalidationListenTask blocked";
70 return STATE_BLOCKED;
71 }
72 DVLOG(2) << "CacheInvalidationListenTask response received";
73 std::string data;
74 if (GetCacheInvalidationIqPacketData(stanza, &data)) {
75 callback_.Run(data);
76 } else {
77 LOG(ERROR) << "Could not get packet data";
78 }
79 // Acknowledge receipt of the iq to the buzz server.
80 // TODO(akalin): Send an error response for malformed packets.
81 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
82 SendStanza(response_stanza.get());
83 return STATE_RESPONSE;
84 }
85
86 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
87 DVLOG(1) << "Stanza received: "
88 << notifier::XmlElementToString(*stanza);
89 if (IsValidCacheInvalidationIqPacket(stanza)) {
90 DVLOG(2) << "Queueing stanza";
91 QueueStanza(stanza);
92 return true;
93 }
94 DVLOG(2) << "Stanza skipped";
95 return false;
96 }
97
98 private:
99 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
100 // We deliberately minimize the verification we do here: see
101 // http://crbug.com/71285 .
102 return MatchRequestIq(stanza, buzz::STR_SET, GetQnData());
103 }
104
105 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
106 std::string* data) {
107 DCHECK(IsValidCacheInvalidationIqPacket(stanza));
108 const buzz::XmlElement* cache_invalidation_iq_packet =
109 stanza->FirstNamed(GetQnData());
110 if (!cache_invalidation_iq_packet) {
111 LOG(ERROR) << "Could not find cache invalidation IQ packet element";
112 return false;
113 }
114 // Look for a channelContext attribute in the content of the stanza. If
115 // present, remember it so it can be echoed back.
116 if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) {
117 context_change_callback_.Run(
118 cache_invalidation_iq_packet->Attr(GetQnChannelContext()));
119 }
120 *data = cache_invalidation_iq_packet->BodyText();
121 return true;
122 }
123
124 base::Callback<void(const std::string&)> callback_;
125 base::Callback<void(const std::string&)> context_change_callback_;
126 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
127 };
128
129 std::string MakeProtocolVersion() {
130 return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) +
131 "." +
132 base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion);
133 }
134
135 // A task that sends a single outbound ClientInvalidation message.
136 class CacheInvalidationSendMessageTask : public buzz::XmppTask {
137 public:
138 CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent,
139 const buzz::Jid& to_jid,
140 const std::string& msg,
141 int seq,
142 const std::string& sid,
143 const std::string& channel_context)
144 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
145 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid),
146 channel_context_(channel_context) {}
147 virtual ~CacheInvalidationSendMessageTask() {}
148
149 virtual int ProcessStart() {
150 scoped_ptr<buzz::XmlElement> stanza(
151 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
152 seq_, sid_, channel_context_));
153 DVLOG(1) << "Sending message: "
154 << notifier::XmlElementToString(*stanza.get());
155 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
156 DVLOG(2) << "Error when sending message";
157 return STATE_ERROR;
158 }
159 return STATE_RESPONSE;
160 }
161
162 virtual int ProcessResponse() {
163 const buzz::XmlElement* stanza = NextStanza();
164 if (stanza == NULL) {
165 DVLOG(2) << "CacheInvalidationSendMessageTask blocked...";
166 return STATE_BLOCKED;
167 }
168 DVLOG(2) << "CacheInvalidationSendMessageTask response received: "
169 << notifier::XmlElementToString(*stanza);
170 // TODO(akalin): Handle errors here.
171 return STATE_DONE;
172 }
173
174 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
175 DVLOG(1) << "Stanza received: "
176 << notifier::XmlElementToString(*stanza);
177 if (!MatchResponseIq(stanza, to_jid_, task_id())) {
178 DVLOG(2) << "Stanza skipped";
179 return false;
180 }
181 DVLOG(2) << "Queueing stanza";
182 QueueStanza(stanza);
183 return true;
184 }
185
186 private:
187 static buzz::XmlElement* MakeCacheInvalidationIqPacket(
188 const buzz::Jid& to_jid,
189 const std::string& task_id,
190 const std::string& msg,
191 int seq, const std::string& sid, const std::string& channel_context) {
192 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
193 buzz::XmlElement* cache_invalidation_iq_packet =
194 new buzz::XmlElement(GetQnData(), true);
195 iq->AddElement(cache_invalidation_iq_packet);
196 cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq));
197 cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid);
198 cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl);
199 cache_invalidation_iq_packet->SetAttr(
200 GetQnProtocolVersion(), MakeProtocolVersion());
201 if (!channel_context.empty()) {
202 cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(),
203 channel_context);
204 }
205 cache_invalidation_iq_packet->SetBodyText(msg);
206 return iq;
207 }
208
209 const buzz::Jid to_jid_;
210 std::string msg_;
211 int seq_;
212 std::string sid_;
213 const std::string channel_context_;
214
215 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
216 };
217
218 std::string MakeSid() {
219 uint64 sid = base::RandUint64();
220 return std::string("chrome-sync-") + base::Uint64ToString(sid);
221 }
222 35
223 } // namespace 36 } // namespace
224 37
225 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( 38 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
226 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) 39 base::WeakPtr<buzz::XmppTaskParentInterface> base_task)
227 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 40 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
228 base_task_(base_task), 41 base_task_(base_task),
229 seq_(0), 42 seq_(0),
230 sid_(MakeSid()) { 43 scheduling_hash_(0) {
231 CHECK(base_task_.get()); 44 CHECK(base_task_.get());
232 // Owned by base_task. Takes ownership of the callback. 45 // Owned by base_task. Takes ownership of the callback.
233 CacheInvalidationListenTask* listen_task = 46 notifier::PushNotificationsListenTask* listen_task =
234 new CacheInvalidationListenTask( 47 new notifier::PushNotificationsListenTask(base_task_, this);
235 base_task_, base::Bind(
236 &CacheInvalidationPacketHandler::HandleInboundPacket,
237 weak_factory_.GetWeakPtr()),
238 base::Bind(
239 &CacheInvalidationPacketHandler::HandleChannelContextChange,
240 weak_factory_.GetWeakPtr()));
241 listen_task->Start(); 48 listen_task->Start();
242 } 49 }
243 50
244 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { 51 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
245 DCHECK(non_thread_safe_.CalledOnValidThread()); 52 DCHECK(non_thread_safe_.CalledOnValidThread());
246 } 53 }
247 54
248 void CacheInvalidationPacketHandler::SendMessage( 55 void CacheInvalidationPacketHandler::SendMessage(
249 const std::string& message) { 56 const std::string& message) {
250 DCHECK(non_thread_safe_.CalledOnValidThread()); 57 DCHECK(non_thread_safe_.CalledOnValidThread());
251 if (!base_task_.get()) { 58 if (!base_task_.get()) {
252 return; 59 return;
253 } 60 }
254 std::string encoded_message; 61 ipc::invalidation::ClientGatewayMessage envelope;
255 if (!base::Base64Encode(message, &encoded_message)) { 62 envelope.set_is_client_to_server(true);
256 LOG(ERROR) << "Could not base64-encode message to send: " 63 if (!service_context_.empty()) {
257 << message; 64 envelope.set_service_context(service_context_);
258 return; 65 envelope.set_rpc_scheduling_hash(scheduling_hash_);
259 } 66 }
67 envelope.set_network_message(message);
68
69 notifier::Recipient recipient;
70 recipient.to = kBotJid;
71 notifier::Notification notification;
72 notification.channel = kChannelName;
73 notification.recipients.push_back(recipient);
74 envelope.SerializeToString(&notification.data);
75
260 // Owned by base_task_. 76 // Owned by base_task_.
261 CacheInvalidationSendMessageTask* send_message_task = 77 notifier::PushNotificationsSendUpdateTask* send_message_task =
262 new CacheInvalidationSendMessageTask(base_task_, 78 new notifier::PushNotificationsSendUpdateTask(base_task_, notification);
263 buzz::Jid(kBotJid),
264 encoded_message,
265 seq_, sid_, channel_context_);
266 send_message_task->Start(); 79 send_message_task->Start();
267 ++seq_;
268 } 80 }
269 81
270 void CacheInvalidationPacketHandler::SetMessageReceiver( 82 void CacheInvalidationPacketHandler::SetMessageReceiver(
271 invalidation::MessageCallback* incoming_receiver) { 83 invalidation::MessageCallback* incoming_receiver) {
272 incoming_receiver_.reset(incoming_receiver); 84 incoming_receiver_.reset(incoming_receiver);
273 } 85 }
274 86
275 void CacheInvalidationPacketHandler::HandleInboundPacket( 87 void CacheInvalidationPacketHandler::SendSubscriptionRequest() {
276 const std::string& packet) { 88 notifier::Subscription subscription;
89 subscription.channel = kChannelName;
90 subscription.from = "";
91 notifier::SubscriptionList subscription_list;
92 subscription_list.push_back(subscription);
93 // Owned by base_task_.
94 notifier::PushNotificationsSubscribeTask* push_subscription_task =
95 new notifier::PushNotificationsSubscribeTask(
96 base_task_, subscription_list, this);
97 push_subscription_task->Start();
98 }
99
100 void CacheInvalidationPacketHandler::OnSubscribed() {
101 // TODO(ghc): Consider whether we should do more here.
102 }
103
104 void CacheInvalidationPacketHandler::OnSubscriptionError() {
105 // TODO(ghc): Consider whether we should do more here.
106 }
107
108 void CacheInvalidationPacketHandler::OnNotificationReceived(
109 const notifier::Notification& notification) {
277 DCHECK(non_thread_safe_.CalledOnValidThread()); 110 DCHECK(non_thread_safe_.CalledOnValidThread());
278 std::string decoded_message; 111 const std::string& decoded_message = notification.data;
279 if (!base::Base64Decode(packet, &decoded_message)) { 112 ipc::invalidation::ClientGatewayMessage envelope;
280 LOG(ERROR) << "Could not base64-decode received message: " 113 envelope.ParseFromString(decoded_message);
281 << packet; 114 if (!envelope.IsInitialized()) {
115 LOG(ERROR) << "Could not parse ClientGatewayMessage: "
116 << decoded_message;
282 return; 117 return;
283 } 118 }
284 incoming_receiver_->Run(decoded_message); 119 if (envelope.has_service_context()) {
285 } 120 service_context_ = envelope.service_context();
286 121 }
287 void CacheInvalidationPacketHandler::HandleChannelContextChange( 122 if (envelope.has_rpc_scheduling_hash()) {
288 const std::string& context) { 123 scheduling_hash_ = envelope.rpc_scheduling_hash();
289 DCHECK(non_thread_safe_.CalledOnValidThread()); 124 }
290 channel_context_ = context; 125 incoming_receiver_->Run(envelope.network_message());
291 } 126 }
292 127
293 } // namespace sync_notifier 128 } // namespace sync_notifier
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698