Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(299)

Side by Side Diff: components/leveldb/leveldb_mojo_proxy.cc

Issue 1839823002: mojo leveldb: Remove the created file thread. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: General patch cleanup. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/leveldb/leveldb_mojo_proxy.h"
6
7 #include <set>
8
9 #include "base/bind.h"
10 #include "mojo/message_pump/message_pump_mojo.h"
11 #include "mojo/platform_handle/platform_handle_functions.h"
12 #include "mojo/public/cpp/bindings/interface_request.h"
13
14 namespace leveldb {
15
16 struct LevelDBMojoProxy::OpaqueLock {
17 filesystem::FilePtr lock_file;
18 };
19
20 struct LevelDBMojoProxy::OpaqueDir {
21 explicit OpaqueDir(
22 mojo::InterfacePtrInfo<filesystem::Directory> directory_info) {
23 directory.Bind(std::move(directory_info));
24 }
25
26 filesystem::DirectoryPtr directory;
27 };
28
29 LevelDBMojoProxy::LevelDBMojoProxy(
30 scoped_refptr<base::SingleThreadTaskRunner> task_runner)
31 : task_runner_(std::move(task_runner)), outstanding_opaque_dirs_(0) {}
32
33 LevelDBMojoProxy::OpaqueDir* LevelDBMojoProxy::RegisterDirectory(
34 filesystem::DirectoryPtr directory) {
35 OpaqueDir* out_dir = nullptr;
36
37 if (task_runner_->BelongsToCurrentThread()) {
38 // If our task runner is the current thread, we switch to synchronous mode
39 // instead of posting tasks with WaitableEvents.
40 RegisterDirectoryImpl(directory.PassInterface(), nullptr, &out_dir);
41 } else {
42 // This proxies to the other thread, which proxies to mojo. Only on the
43 // reply from mojo do we return from this.
jam 2016/03/29 05:58:00 nit: no point in repeating this comment everywhere
44 base::WaitableEvent done_event(false, false);
45 task_runner_->PostTask(
46 FROM_HERE, base::Bind(&LevelDBMojoProxy::RegisterDirectoryImpl, this,
47 base::Passed(directory.PassInterface()),
48 &done_event, &out_dir));
49 done_event.Wait();
50 }
51
52 return out_dir;
53 }
54
55 void LevelDBMojoProxy::UnregisterDirectory(OpaqueDir* dir) {
56 if (task_runner_->BelongsToCurrentThread()) {
57 UnregisterDirectoryImpl(dir, nullptr);
58 } else {
59 // This proxies to the other thread, which proxies to mojo. Only on the
60 // reply from mojo do we return from this.
61 base::WaitableEvent done_event(false, false);
62 task_runner_->PostTask(
63 FROM_HERE, base::Bind(&LevelDBMojoProxy::UnregisterDirectoryImpl, this,
64 dir, &done_event));
65 done_event.Wait();
jam 2016/03/29 05:58:00 since this code block is duplicated in all methods
Elliot Glaysher 2016/03/29 17:33:29 So how does the WaitableEvent block the current me
66 }
67 }
68
69 base::File LevelDBMojoProxy::OpenFileHandle(OpaqueDir* dir,
70 const std::string& name,
71 uint32_t open_flags) {
72 base::File file;
73
74 if (task_runner_->BelongsToCurrentThread()) {
75 OpenFileHandleImpl(dir, name, open_flags, nullptr, &file);
76 } else {
77 // This proxies to the other thread, which proxies to mojo. Only on the
78 // reply from mojo do we return from this.
79 base::WaitableEvent done_event(false, false);
80 task_runner_->PostTask(
81 FROM_HERE, base::Bind(&LevelDBMojoProxy::OpenFileHandleImpl, this, dir,
82 name, open_flags, &done_event, &file));
83 done_event.Wait();
84 }
85
86 return file;
87 }
88
89 filesystem::FileError LevelDBMojoProxy::SyncDirectory(OpaqueDir* dir,
90 const std::string& name) {
91 filesystem::FileError error = filesystem::FileError::FAILED;
92
93 if (task_runner_->BelongsToCurrentThread()) {
94 SyncDirectoryImpl(dir, name, nullptr, &error);
95 } else {
96 // This proxies to the other thread, which proxies to mojo. Only on the
97 // reply from mojo do we return from this.
98 base::WaitableEvent done_event(false, false);
99 task_runner_->PostTask(
100 FROM_HERE, base::Bind(&LevelDBMojoProxy::SyncDirectoryImpl, this, dir,
101 name, &done_event, &error));
102 done_event.Wait();
103 }
104
105 return error;
106 }
107
108 bool LevelDBMojoProxy::FileExists(OpaqueDir* dir, const std::string& name) {
109 bool exists = false;
110
111 if (task_runner_->BelongsToCurrentThread()) {
112 FileExistsImpl(dir, name, nullptr, &exists);
113 } else {
114 // This proxies to the other thread, which proxies to mojo. Only on the
115 // reply from mojo do we return from this.
116 base::WaitableEvent done_event(false, false);
117 task_runner_->PostTask(
118 FROM_HERE, base::Bind(&LevelDBMojoProxy::FileExistsImpl, this, dir,
119 name, &done_event, &exists));
120 done_event.Wait();
121 }
122
123 return exists;
124 }
125
126 filesystem::FileError LevelDBMojoProxy::GetChildren(
127 OpaqueDir* dir,
128 const std::string& path,
129 std::vector<std::string>* result) {
130 filesystem::FileError error = filesystem::FileError::FAILED;
131
132 if (task_runner_->BelongsToCurrentThread()) {
133 GetChildrenImpl(dir, path, result, nullptr, &error);
134 } else {
135 // This proxies to the other thread, which proxies to mojo. Only on the
136 // reply from mojo do we return from this.
137 base::WaitableEvent done_event(false, false);
138 task_runner_->PostTask(
139 FROM_HERE, base::Bind(&LevelDBMojoProxy::GetChildrenImpl, this, dir,
140 path, result, &done_event, &error));
141 done_event.Wait();
142 }
143
144 return error;
145 }
146
147 filesystem::FileError LevelDBMojoProxy::Delete(OpaqueDir* dir,
148 const std::string& path,
149 uint32_t delete_flags) {
150 filesystem::FileError error = filesystem::FileError::FAILED;
151
152 if (task_runner_->BelongsToCurrentThread()) {
153 DeleteImpl(dir, path, delete_flags, nullptr, &error);
154 } else {
155 // This proxies to the other thread, which proxies to mojo. Only on the
156 // reply from mojo do we return from this.
157 base::WaitableEvent done_event(false, false);
158 task_runner_->PostTask(
159 FROM_HERE, base::Bind(&LevelDBMojoProxy::DeleteImpl, this, dir, path,
160 delete_flags, &done_event, &error));
161 done_event.Wait();
162 }
163
164 return error;
165 }
166
167 filesystem::FileError LevelDBMojoProxy::CreateDir(OpaqueDir* dir,
168 const std::string& path) {
169 filesystem::FileError error = filesystem::FileError::FAILED;
170
171 if (task_runner_->BelongsToCurrentThread()) {
172 CreateDirImpl(dir, path, nullptr, &error);
173 } else {
174 // This proxies to the other thread, which proxies to mojo. Only on the
175 // reply from mojo do we return from this.
176 base::WaitableEvent done_event(false, false);
177 task_runner_->PostTask(
178 FROM_HERE, base::Bind(&LevelDBMojoProxy::CreateDirImpl, this, dir, path,
179 &done_event, &error));
180 done_event.Wait();
181 }
182
183 return error;
184 }
185
186 filesystem::FileError LevelDBMojoProxy::GetFileSize(OpaqueDir* dir,
187 const std::string& path,
188 uint64_t* file_size) {
189 filesystem::FileError error = filesystem::FileError::FAILED;
190
191 if (task_runner_->BelongsToCurrentThread()) {
192 GetFileSizeImpl(dir, path, file_size, nullptr, &error);
193 } else {
194 // This proxies to the other thread, which proxies to mojo. Only on the
195 // reply from mojo do we return from this.
196 base::WaitableEvent done_event(false, false);
197 task_runner_->PostTask(
198 FROM_HERE, base::Bind(&LevelDBMojoProxy::GetFileSizeImpl, this, dir,
199 path, file_size, &done_event, &error));
200 done_event.Wait();
201 }
202
203 return error;
204 }
205
206 filesystem::FileError LevelDBMojoProxy::RenameFile(
207 OpaqueDir* dir,
208 const std::string& old_path,
209 const std::string& new_path) {
210 filesystem::FileError error = filesystem::FileError::FAILED;
211
212 if (task_runner_->BelongsToCurrentThread()) {
213 RenameFileImpl(dir, old_path, new_path, nullptr, &error);
214 } else {
215 // This proxies to the other thread, which proxies to mojo. Only on the
216 // reply from mojo do we return from this.
217 base::WaitableEvent done_event(false, false);
218 task_runner_->PostTask(
219 FROM_HERE, base::Bind(&LevelDBMojoProxy::RenameFileImpl, this, dir,
220 old_path, new_path, &done_event, &error));
221 done_event.Wait();
222 }
223
224 return error;
225 }
226
227 std::pair<filesystem::FileError, LevelDBMojoProxy::OpaqueLock*>
228 LevelDBMojoProxy::LockFile(OpaqueDir* dir, const std::string& path) {
229 filesystem::FileError error = filesystem::FileError::FAILED;
230 OpaqueLock* out_lock = nullptr;
231
232 if (task_runner_->BelongsToCurrentThread()) {
233 LockFileImpl(dir, path, nullptr, &error, &out_lock);
234 } else {
235 // This proxies to the other thread, which proxies to mojo. Only on the
236 // reply from mojo do we return from this.
237 base::WaitableEvent done_event(false, false);
238 task_runner_->PostTask(
239 FROM_HERE, base::Bind(&LevelDBMojoProxy::LockFileImpl, this, dir, path,
240 &done_event, &error, &out_lock));
241 done_event.Wait();
242 }
243
244 return std::make_pair(error, out_lock);
245 }
246
247 filesystem::FileError LevelDBMojoProxy::UnlockFile(OpaqueLock* lock) {
248 // Take ownership of the incoming lock so it gets destroyed whatever happens.
249 scoped_ptr<OpaqueLock> scoped_lock(lock);
250 filesystem::FileError error = filesystem::FileError::FAILED;
251
252 if (task_runner_->BelongsToCurrentThread()) {
253 UnlockFileImpl(std::move(scoped_lock), nullptr, &error);
254 } else {
255 // This proxies to the other thread, which proxies to mojo. Only on the
256 // reply from mojo do we return from this.
257 base::WaitableEvent done_event(false, false);
258 task_runner_->PostTask(
259 FROM_HERE, base::Bind(&LevelDBMojoProxy::UnlockFileImpl, this,
260 base::Passed(&scoped_lock), &done_event, &error));
261 done_event.Wait();
262 }
263
264 return error;
265 }
266
267 LevelDBMojoProxy::~LevelDBMojoProxy() {
268 DCHECK_EQ(0, outstanding_opaque_dirs_);
269 }
270
271 void LevelDBMojoProxy::SignalIfNeeded(base::WaitableEvent* done_event) {
272 if (done_event)
273 done_event->Signal();
274 }
275
276 void LevelDBMojoProxy::RegisterDirectoryImpl(
277 mojo::InterfacePtrInfo<filesystem::Directory> directory_info,
278 base::WaitableEvent* done_event,
279 OpaqueDir** out_dir) {
280 // Take the Directory pipe and bind it on this thread.
281 *out_dir = new OpaqueDir(std::move(directory_info));
282 outstanding_opaque_dirs_++;
283 SignalIfNeeded(done_event);
284 }
285
286 void LevelDBMojoProxy::UnregisterDirectoryImpl(
287 OpaqueDir* dir,
288 base::WaitableEvent* done_event) {
289 // Only delete the directories on the thread that owns them.
290 delete dir;
291 outstanding_opaque_dirs_--;
292 SignalIfNeeded(done_event);
293 }
294
295 void LevelDBMojoProxy::OpenFileHandleImpl(OpaqueDir* dir,
296 std::string name,
297 uint32_t open_flags,
298 base::WaitableEvent* done_event,
299 base::File* output_file) {
300 mojo::ScopedHandle handle;
301 filesystem::FileError error = filesystem::FileError::FAILED;
302 bool completed = dir->directory->OpenFileHandle(mojo::String::From(name),
303 open_flags, &error, &handle);
304 DCHECK(completed);
305
306 if (error != filesystem::FileError::OK) {
307 *output_file = base::File(static_cast<base::File::Error>(error));
308 } else {
309 MojoPlatformHandle platform_handle;
310 MojoResult extract_result =
311 MojoExtractPlatformHandle(handle.release().value(), &platform_handle);
312
313 if (extract_result == MOJO_RESULT_OK) {
314 *output_file = base::File(platform_handle);
315 } else {
316 NOTREACHED();
317 *output_file = base::File(base::File::Error::FILE_ERROR_FAILED);
318 }
319 }
320
321 SignalIfNeeded(done_event);
322 }
323
324 void LevelDBMojoProxy::SyncDirectoryImpl(OpaqueDir* dir,
325 std::string name,
326 base::WaitableEvent* done_event,
327 filesystem::FileError* out_error) {
328 filesystem::DirectoryPtr target;
329 bool completed = dir->directory->OpenDirectory(
330 name, GetProxy(&target), filesystem::kFlagRead | filesystem::kFlagWrite,
331 out_error);
332 DCHECK(completed);
333
334 if (*out_error != filesystem::FileError::OK) {
335 SignalIfNeeded(done_event);
336 return;
337 }
338
339 completed = target->Flush(out_error);
340 DCHECK(completed);
341 SignalIfNeeded(done_event);
342 }
343
344 void LevelDBMojoProxy::FileExistsImpl(OpaqueDir* dir,
345 std::string name,
346 base::WaitableEvent* done_event,
347 bool* exists) {
348 filesystem::FileError error = filesystem::FileError::FAILED;
349 bool completed =
350 dir->directory->Exists(mojo::String::From(name), &error, exists);
351 DCHECK(completed);
352 SignalIfNeeded(done_event);
353 }
354
355 void LevelDBMojoProxy::GetChildrenImpl(OpaqueDir* dir,
356 std::string name,
357 std::vector<std::string>* out_contents,
358 base::WaitableEvent* done_event,
359 filesystem::FileError* out_error) {
360 // Step one: open the directory |name| from the toplevel directory.
361 filesystem::DirectoryPtr target;
362 filesystem::DirectoryRequest proxy = GetProxy(&target);
363 bool completed = dir->directory->OpenDirectory(
364 name, std::move(proxy), filesystem::kFlagRead | filesystem::kFlagWrite,
365 out_error);
366 DCHECK(completed);
367
368 if (*out_error != filesystem::FileError::OK) {
369 SignalIfNeeded(done_event);
370 return;
371 }
372
373 mojo::Array<filesystem::DirectoryEntryPtr> directory_contents;
374 completed = target->Read(out_error, &directory_contents);
375 DCHECK(completed);
376
377 if (!directory_contents.is_null()) {
378 for (size_t i = 0; i < directory_contents.size(); ++i)
379 out_contents->push_back(directory_contents[i]->name.To<std::string>());
380 }
381
382 SignalIfNeeded(done_event);
383 }
384
385 void LevelDBMojoProxy::DeleteImpl(OpaqueDir* dir,
386 std::string name,
387 uint32_t delete_flags,
388 base::WaitableEvent* done_event,
389 filesystem::FileError* out_error) {
390 bool completed =
391 dir->directory->Delete(mojo::String::From(name), delete_flags, out_error);
392 DCHECK(completed);
393 SignalIfNeeded(done_event);
394 }
395
396 void LevelDBMojoProxy::CreateDirImpl(OpaqueDir* dir,
397 std::string name,
398 base::WaitableEvent* done_event,
399 filesystem::FileError* out_error) {
400 bool completed = dir->directory->OpenDirectory(
401 name, nullptr,
402 filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate,
403 out_error);
404 DCHECK(completed);
405 SignalIfNeeded(done_event);
406 }
407
408 void LevelDBMojoProxy::GetFileSizeImpl(OpaqueDir* dir,
409 const std::string& path,
410 uint64_t* file_size,
411 base::WaitableEvent* done_event,
412 filesystem::FileError* out_error) {
413 filesystem::FileInformationPtr info;
414 bool completed = dir->directory->StatFile(path, out_error, &info);
415 DCHECK(completed);
416 if (info)
417 *file_size = info->size;
418 SignalIfNeeded(done_event);
419 }
420
421 void LevelDBMojoProxy::RenameFileImpl(OpaqueDir* dir,
422 const std::string& old_path,
423 const std::string& new_path,
424 base::WaitableEvent* done_event,
425 filesystem::FileError* out_error) {
426 bool completed = dir->directory->Rename(
427 mojo::String::From(old_path), mojo::String::From(new_path), out_error);
428 DCHECK(completed);
429 SignalIfNeeded(done_event);
430 }
431
432 void LevelDBMojoProxy::LockFileImpl(OpaqueDir* dir,
433 const std::string& path,
434 base::WaitableEvent* done_event,
435 filesystem::FileError* out_error,
436 OpaqueLock** out_lock) {
437 // Since a lock is associated with a file descriptor, we need to open and
438 // have a persistent file on the other side of the connection.
439 filesystem::FilePtr target;
440 filesystem::FileRequest proxy = GetProxy(&target);
441 bool completed = dir->directory->OpenFile(
442 mojo::String::From(path), std::move(proxy),
443 filesystem::kFlagOpenAlways | filesystem::kFlagRead |
444 filesystem::kFlagWrite,
445 out_error);
446 DCHECK(completed);
447
448 if (*out_error != filesystem::FileError::OK) {
449 SignalIfNeeded(done_event);
450 return;
451 }
452
453 completed = target->Lock(out_error);
454 DCHECK(completed);
455
456 if (*out_error == filesystem::FileError::OK) {
457 OpaqueLock* l = new OpaqueLock;
458 l->lock_file = std::move(target);
459 *out_lock = l;
460 }
461
462 SignalIfNeeded(done_event);
463 }
464
465 void LevelDBMojoProxy::UnlockFileImpl(scoped_ptr<OpaqueLock> lock,
466 base::WaitableEvent* done_event,
467 filesystem::FileError* out_error) {
468 lock->lock_file->Unlock(out_error);
469 SignalIfNeeded(done_event);
470 }
471
472 } // namespace leveldb
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698