Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(185)

Side by Side Diff: google_apis/gcm/engine/rmq_store.cc

Issue 56353002: [GCM] Add RMQ storage and MCS message passing support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Self review Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/engine/rmq_store.h"
6
7 #include "base/basictypes.h"
8 #include "base/bind.h"
9 #include "base/callback.h"
10 #include "base/files/file_path.h"
11 #include "base/logging.h"
12 #include "base/message_loop/message_loop_proxy.h"
13 #include "base/sequenced_task_runner.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_number_conversions.h"
16 #include "base/strings/string_piece.h"
17 #include "base/tracked_objects.h"
18 #include "components/webdata/encryptor/encryptor.h"
19 #include "google_apis/gcm/base/mcs_message.h"
20 #include "google_apis/gcm/base/mcs_util.h"
21 #include "google_apis/gcm/protocol/gcm.pb.h"
22 #include "third_party/leveldatabase/src/include/leveldb/db.h"
23
24 namespace gcm {
25
26 namespace {
27
28 // ---- LevelDB keys. ----
29 // Key for this device's android id.
30 const char kDeviceAIDKey[] = "device_aid_key";
31 // Key for this device's android security token.
32 const char kDeviceTokenKey[] = "device_token_key";
33 // Lowest lexicographically ordered incoming message key.
34 // Used for prefixing messages.
35 const char kIncomingMsgKeyStart[] = "incoming1-";
36 // Key guaranteed to be higher than all incoming message keys.
37 // Used for limiting iteration.
38 const char kIncomingMsgKeyEnd[] = "incoming2-";
39 // Lowest lexicographically ordered outgoing message key.
40 // Used for prefixing outgoing messages.
41 const char kOutgoingMsgKeyStart[] = "outgoing1-";
42 // Key guaranteed to be higher than all outgoing message keys.
43 // Used for limiting iteration.
44 const char kOutgoingMsgKeyEnd[] = "outgoing2-";
45
46 std::string MakeIncomingKey(const std::string& persistent_id) {
47 return kIncomingMsgKeyStart + persistent_id;
48 }
49
50 std::string MakeOutgoingKey(const std::string& persistent_id) {
51 return kOutgoingMsgKeyStart + persistent_id;
52 }
53
54 std::string ParseOutgoingKey(const std::string& key) {
55 return key.substr(arraysize(kOutgoingMsgKeyStart) - 1);
56 }
57
58 leveldb::Slice MakeSlice(const base::StringPiece& s) {
59 return leveldb::Slice(s.begin(), s.size());
60 }
61
62 } // namespace
63
64 class RMQStore::Backend : public base::RefCountedThreadSafe<RMQStore::Backend> {
65 public:
66 Backend(const base::FilePath& path,
67 scoped_refptr<base::SequencedTaskRunner> foreground_runner);
68
69 // Blocking implementations of RMQStore methods.
70 void Load(const LoadCallback& callback);
71 void Destroy(const UpdateCallback& callback);
72 void SetDeviceCredentials(uint64 device_android_id,
73 uint64 device_security_token,
74 const UpdateCallback& callback);
75 void AddIncomingMessage(const std::string& persistent_id,
76 const UpdateCallback& callback);
77 void RemoveIncomingMessages(const PersistentIdList& persistent_ids,
78 const UpdateCallback& callback);
79 void AddOutgoingMessage(const std::string& persistent_id,
80 const MCSMessage& message,
81 const UpdateCallback& callback);
82 void RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
83 const UpdateCallback& callback);
84
85 private:
86 friend class base::RefCountedThreadSafe<Backend>;
87 ~Backend();
88
89 bool LoadDeviceCredentials(uint64* android_id, uint64* security_token);
90 bool LoadIncomingMessages(std::vector<std::string>* incoming_messages);
91 bool LoadOutgoingMessages(
92 std::map<std::string, google::protobuf::MessageLite*>* outgoing_messages);
93
94 const base::FilePath path_;
95 scoped_refptr<base::SequencedTaskRunner> foreground_task_runner_;
96
97 scoped_ptr<leveldb::DB> db_;
98 };
99
100 RMQStore::Backend::Backend(
101 const base::FilePath& path,
102 scoped_refptr<base::SequencedTaskRunner> foreground_task_runner)
103 : path_(path),
104 foreground_task_runner_(foreground_task_runner) {
105 }
106
107 RMQStore::Backend::~Backend() {
108 }
109
110 void RMQStore::Backend::Load(const LoadCallback& callback) {
111 LoadResult result;
112 std::vector<std::string> incoming_messages;
113 // Callee takes ownership of protobufs.
114 std::map<std::string, google::protobuf::MessageLite*> outgoing_messages;
115
116 leveldb::Options options;
117 options.create_if_missing = true;
118 leveldb::DB* db;
119 leveldb::Status status = leveldb::DB::Open(options, path_.value(), &db);
120 if (!status.ok()) {
121 LOG(ERROR) << "Failed to open database " << path_.value()
122 << ": " << status.ToString();
123 foreground_task_runner_->PostTask(FROM_HERE,
124 base::Bind(callback, result));
125 return;
126 }
127 db_.reset(db);
128
129 if (!LoadDeviceCredentials(&result.device_android_id,
130 &result.device_security_token) ||
131 !LoadIncomingMessages(&result.incoming_messages) ||
132 !LoadOutgoingMessages(&result.outgoing_messages)) {
133 incoming_messages.clear();
134 STLDeleteContainerPairSecondPointers(outgoing_messages.begin(),
135 outgoing_messages.end());
136 outgoing_messages.clear();
137 foreground_task_runner_->PostTask(FROM_HERE,
138 base::Bind(callback, result));
fgorski 2013/11/01 20:26:10 does it make sense to also clear android_id and se
Nicolas Zea 2013/11/01 21:18:49 Done. (also fixed bug w.r.t. deleting/clearing wro
139 return;
140 }
141
142 DVLOG(1) << "Succeeded in loading " << incoming_messages.size()
143 << " unacknowledged incoming messages and "
144 << outgoing_messages.size() << " unacknowledged outgoing messages.";
145 result.success = true;
146 foreground_task_runner_->PostTask(FROM_HERE,
147 base::Bind(callback, result));
148 return;
149 }
150
151 void RMQStore::Backend::Destroy(const UpdateCallback& callback) {
152 DVLOG(1) << "Destroying RMQ store.";
153 const leveldb::Status s =
154 leveldb::DestroyDB(path_.value(), leveldb::Options());
155 if (s.ok()) {
156 foreground_task_runner_->PostTask(FROM_HERE,
157 base::Bind(callback, true));
158 return;
159 }
160 LOG(ERROR) << "Destroy failed.";
161 foreground_task_runner_->PostTask(FROM_HERE,
162 base::Bind(callback, false));
163 }
164
165 void RMQStore::Backend::SetDeviceCredentials(uint64 device_android_id,
166 uint64 device_security_token,
167 const UpdateCallback& callback) {
168 DVLOG(1) << "Saving device credentials with AID " << device_android_id;
169 leveldb::WriteOptions write_options;
170 write_options.sync = true;
171
172 std::string encrypted_token;
173 Encryptor::EncryptString(base::Uint64ToString(device_security_token),
174 &encrypted_token);
175 leveldb::Status s =
176 db_->Put(write_options,
177 MakeSlice(kDeviceAIDKey),
178 MakeSlice(base::Uint64ToString(device_android_id)));
179 if (s.ok()) {
180 s = db_->Put(write_options,
181 MakeSlice(kDeviceTokenKey),
182 MakeSlice(encrypted_token));
183 }
184 if (s.ok()) {
185 foreground_task_runner_->PostTask(FROM_HERE,
186 base::Bind(callback, true));
187 return;
188 }
189 LOG(ERROR) << "LevelDB put failed: " << s.ToString();
190 foreground_task_runner_->PostTask(FROM_HERE,
191 base::Bind(callback, false));
192 }
193
194 void RMQStore::Backend::AddIncomingMessage(const std::string& persistent_id,
195 const UpdateCallback& callback) {
196 DVLOG(1) << "Saving incoming message with id " << persistent_id;
197 leveldb::WriteOptions write_options;
198 write_options.sync = true;
199
200 const leveldb::Status s =
201 db_->Put(write_options,
202 MakeSlice(MakeIncomingKey(persistent_id)),
203 MakeSlice(persistent_id));
204 if (s.ok()) {
205 foreground_task_runner_->PostTask(FROM_HERE,
206 base::Bind(callback, true));
207 return;
208 }
209 LOG(ERROR) << "LevelDB put failed: " << s.ToString();
210 foreground_task_runner_->PostTask(FROM_HERE,
211 base::Bind(callback, false));
212 }
213
214 void RMQStore::Backend::RemoveIncomingMessages(
215 const PersistentIdList& persistent_ids,
216 const UpdateCallback& callback) {
217 leveldb::WriteOptions write_options;
218 write_options.sync = true;
219
220 leveldb::Status s;
221 for (PersistentIdList::const_iterator iter = persistent_ids.begin();
222 iter != persistent_ids.end(); ++iter){
223 DVLOG(1) << "Removing incoming message with id " << *iter;
224 s = db_->Delete(write_options,
225 MakeSlice(MakeIncomingKey(*iter)));
226 if (!s.ok())
227 break;
228 }
229 if (s.ok()) {
fgorski 2013/11/01 20:26:10 What is the default value of s.ok()?
Nicolas Zea 2013/11/01 21:18:49 default is true.
230 foreground_task_runner_->PostTask(FROM_HERE,
231 base::Bind(callback, true));
232 return;
233 }
234 LOG(ERROR) << "LevelDB remove failed: " << s.ToString();
235 foreground_task_runner_->PostTask(FROM_HERE,
236 base::Bind(callback, false));
237 }
238
239 void RMQStore::Backend::AddOutgoingMessage(
240 const std::string& persistent_id,
241 const MCSMessage& message,
242 const UpdateCallback& callback) {
243 DVLOG(1) << "Saving outgoing message with id " << persistent_id;
244 leveldb::WriteOptions write_options;
245 write_options.sync = true;
246
247 std::string data = static_cast<char>(message.tag()) +
248 message.SerializeAsString();
249 const leveldb::Status s =
250 db_->Put(write_options,
251 MakeSlice(MakeOutgoingKey(persistent_id)),
252 MakeSlice(data));
253 if (s.ok()) {
254 foreground_task_runner_->PostTask(FROM_HERE,
255 base::Bind(callback, true));
256 return;
257 }
258 LOG(ERROR) << "LevelDB put failed: " << s.ToString();
259 foreground_task_runner_->PostTask(FROM_HERE,
260 base::Bind(callback, false));
261
262 }
263
264 void RMQStore::Backend::RemoveOutgoingMessages(
265 const PersistentIdList& persistent_ids,
266 const UpdateCallback& callback) {
267 leveldb::WriteOptions write_options;
268 write_options.sync = true;
269
270 leveldb::Status s;
271 for (PersistentIdList::const_iterator iter = persistent_ids.begin();
272 iter != persistent_ids.end(); ++iter){
273 DVLOG(1) << "Removing outgoing message with id " << *iter;
274 s = db_->Delete(write_options,
275 MakeSlice(MakeOutgoingKey(*iter)));
276 if (!s.ok())
277 break;
278 }
279 if (s.ok()) {
280 foreground_task_runner_->PostTask(FROM_HERE,
281 base::Bind(callback, true));
282 return;
283 }
284 LOG(ERROR) << "LevelDB remove failed: " << s.ToString();
285 foreground_task_runner_->PostTask(FROM_HERE,
286 base::Bind(callback, false));
287 }
288
289 bool RMQStore::Backend::LoadDeviceCredentials(uint64* android_id,
290 uint64* security_token) {
291 leveldb::ReadOptions read_options;
292 read_options.verify_checksums = true;
293
294 std::string result;
295 leveldb::Status s = db_->Get(read_options,
296 MakeSlice(kDeviceAIDKey),
297 &result);
298 if (s.ok()) {
299 if (!base::StringToUint64(result, android_id)) {
300 LOG(ERROR) << "Failed to restore device id.";
301 return false;
302 }
303 result.clear();
304 s = db_->Get(read_options,
305 MakeSlice(kDeviceTokenKey),
306 &result);
307 }
308 if (s.ok()) {
309 std::string decrypted_token;
310 Encryptor::DecryptString(result, &decrypted_token);
311 if (!base::StringToUint64(decrypted_token, security_token)) {
312 LOG(ERROR) << "Failed to restore security token.";
313 return false;
314 }
315 return true;
316 }
317
318 if (s.IsNotFound()) {
319 DVLOG(1) << "No credentials found.";
320 return true;
321 }
322
323 LOG(ERROR) << "Error reading credentials from store.";
324 return false;
325 }
326
327 bool RMQStore::Backend::LoadIncomingMessages(
328 std::vector<std::string>* incoming_messages) {
329 leveldb::ReadOptions read_options;
330 read_options.verify_checksums = true;
331
332 scoped_ptr<leveldb::Iterator> iter(db_->NewIterator(read_options));
333 for (iter->Seek(MakeSlice(kIncomingMsgKeyStart));
334 iter->Valid() && iter->key().ToString() < kIncomingMsgKeyEnd;
335 iter->Next()) {
336 leveldb::Slice s = iter->value();
337 if (s.empty()) {
338 LOG(ERROR) << "Error reading incoming message with key "
339 << iter->key().ToString();
340 return false;
341 }
342 DVLOG(1) << "Found incoming message with id " << s.ToString();
343 incoming_messages->push_back(s.ToString());
344 }
345
346 return true;
347 }
348
349 bool RMQStore::Backend::LoadOutgoingMessages(
350 std::map<std::string, google::protobuf::MessageLite*>*
351 outgoing_messages) {
352 leveldb::ReadOptions read_options;
353 read_options.verify_checksums = true;
354
355 scoped_ptr<leveldb::Iterator> iter(db_->NewIterator(read_options));
356 for (iter->Seek(MakeSlice(kOutgoingMsgKeyStart));
357 iter->Valid() && iter->key().ToString() < kOutgoingMsgKeyEnd;
358 iter->Next()) {
359 leveldb::Slice s = iter->value();
360 if (s.empty() || iter->value().size() <= 1) {
fgorski 2013/11/01 20:26:10 replace iter->value() with s check if it is possib
Nicolas Zea 2013/11/01 21:18:49 Done.
361 LOG(ERROR) << "Error reading incoming message with key " << s.ToString();
362 return false;
363 }
364 uint8 tag = iter->value().data()[0];
365 std::string id = ParseOutgoingKey(iter->key().ToString());
366 scoped_ptr<google::protobuf::MessageLite> message(
367 BuildProtobufFromTag(tag));
368 if (!message.get() ||
369 !message->ParseFromString(iter->value().ToString().substr(1))) {
370 LOG(ERROR) << "Failed to parse outgoing message with id "
371 << id << " and tag " << tag;
372 return false;
373 }
374 DVLOG(1) << "Found outgoing message with id " << id << " of type "
375 << base::IntToString(tag);
376 (*outgoing_messages)[id] = message.release();
377 }
378
379 return true;
380 }
381
382 RMQStore::LoadResult::LoadResult()
383 : success(false),
384 device_android_id(0),
385 device_security_token(0) {
386 }
387 RMQStore::LoadResult::~LoadResult() {}
388
389 RMQStore::RMQStore(
390 const base::FilePath& path,
391 scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
392 : backend_(new Backend(path, base::MessageLoopProxy::current())),
393 blocking_task_runner_(blocking_task_runner) {
394 }
395
396 RMQStore::~RMQStore() {
397 }
398
399 void RMQStore::Load(const LoadCallback& callback) {
400 blocking_task_runner_->PostTask(FROM_HERE,
401 base::Bind(&RMQStore::Backend::Load,
402 backend_,
403 callback));
404 }
405
406 void RMQStore::Destroy(const UpdateCallback& callback) {
407 blocking_task_runner_->PostTask(
408 FROM_HERE,
409 base::Bind(&RMQStore::Backend::Destroy,
410 backend_,
411 callback));
412 }
413
414 void RMQStore::SetDeviceCredentials(uint64 device_android_id,
415 uint64 device_security_token,
416 const UpdateCallback& callback) {
417 blocking_task_runner_->PostTask(
418 FROM_HERE,
419 base::Bind(&RMQStore::Backend::SetDeviceCredentials,
420 backend_,
421 device_android_id,
422 device_security_token,
423 callback));
424 }
425
426 void RMQStore::AddIncomingMessage(const std::string& persistent_id,
427 const UpdateCallback& callback) {
428 blocking_task_runner_->PostTask(
429 FROM_HERE,
430 base::Bind(&RMQStore::Backend::AddIncomingMessage,
431 backend_,
432 persistent_id,
433 callback));
434 }
435
436 void RMQStore::RemoveIncomingMessage(const std::string& persistent_id,
437 const UpdateCallback& callback) {
438 blocking_task_runner_->PostTask(
439 FROM_HERE,
440 base::Bind(&RMQStore::Backend::RemoveIncomingMessages,
441 backend_,
442 PersistentIdList(1, persistent_id),
443 callback));
444 }
445
446 void RMQStore::RemoveIncomingMessages(const PersistentIdList& persistent_ids,
447 const UpdateCallback& callback) {
448 blocking_task_runner_->PostTask(
449 FROM_HERE,
450 base::Bind(&RMQStore::Backend::RemoveIncomingMessages,
451 backend_,
452 persistent_ids,
453 callback));
454 }
455
456 void RMQStore::AddOutgoingMessage(const std::string& persistent_id,
457 const MCSMessage& message,
458 const UpdateCallback& callback) {
459 blocking_task_runner_->PostTask(
460 FROM_HERE,
461 base::Bind(&RMQStore::Backend::AddOutgoingMessage,
462 backend_,
463 persistent_id,
464 message,
465 callback));
466 }
467
468 void RMQStore::RemoveOutgoingMessage(const std::string& persistent_id,
469 const UpdateCallback& callback) {
470 blocking_task_runner_->PostTask(
471 FROM_HERE,
472 base::Bind(&RMQStore::Backend::RemoveOutgoingMessages,
473 backend_,
474 PersistentIdList(1, persistent_id),
475 callback));
476 }
477
478 void RMQStore::RemoveOutgoingMessages(const PersistentIdList& persistent_ids,
479 const UpdateCallback& callback) {
480 blocking_task_runner_->PostTask(
481 FROM_HERE,
482 base::Bind(&RMQStore::Backend::RemoveOutgoingMessages,
483 backend_,
484 persistent_ids,
485 callback));
486 }
487
488 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698