Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(575)

Unified Diff: mojo/common/handle_watcher.cc

Issue 1280463003: Revert of Straightens outs DEPS in mojo/common (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/common/handle_watcher.h ('k') | mojo/common/handle_watcher_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/common/handle_watcher.cc
diff --git a/mojo/common/handle_watcher.cc b/mojo/common/handle_watcher.cc
new file mode 100644
index 0000000000000000000000000000000000000000..6bff0cd1a8981bfdcb190981d6d4b7aa93b88915
--- /dev/null
+++ b/mojo/common/handle_watcher.cc
@@ -0,0 +1,475 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/common/handle_watcher.h"
+
+#include <map>
+
+#include "base/atomic_sequence_num.h"
+#include "base/bind.h"
+#include "base/lazy_instance.h"
+#include "base/logging.h"
+#include "base/macros.h"
+#include "base/memory/singleton.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/single_thread_task_runner.h"
+#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/thread_task_runner_handle.h"
+#include "base/threading/thread.h"
+#include "base/threading/thread_restrictions.h"
+#include "base/time/time.h"
+#include "mojo/common/message_pump_mojo.h"
+#include "mojo/common/message_pump_mojo_handler.h"
+#include "mojo/common/time_helper.h"
+
+namespace mojo {
+namespace common {
+
+typedef int WatcherID;
+
+namespace {
+
+const char kWatcherThreadName[] = "handle-watcher-thread";
+
+base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
+ return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
+ internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
+}
+
+// Tracks the data for a single call to Start().
+struct WatchData {
+ WatchData()
+ : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {}
+
+ WatcherID id;
+ Handle handle;
+ MojoHandleSignals handle_signals;
+ base::TimeTicks deadline;
+ base::Callback<void(MojoResult)> callback;
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner;
+};
+
+// WatcherBackend --------------------------------------------------------------
+
+// WatcherBackend is responsible for managing the requests and interacting with
+// MessagePumpMojo. All access (outside of creation/destruction) is done on the
+// thread WatcherThreadManager creates.
+class WatcherBackend : public MessagePumpMojoHandler {
+ public:
+ WatcherBackend();
+ ~WatcherBackend() override;
+
+ void StartWatching(const WatchData& data);
+
+ // Cancels a previously scheduled request to start a watch.
+ void StopWatching(WatcherID watcher_id);
+
+ private:
+ typedef std::map<Handle, WatchData> HandleToWatchDataMap;
+
+ // Invoked when a handle needs to be removed and notified.
+ void RemoveAndNotify(const Handle& handle, MojoResult result);
+
+ // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
+ // and sets |handle| to the Handle. Returns false if not a known id.
+ bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
+
+ // MessagePumpMojoHandler overrides:
+ void OnHandleReady(const Handle& handle) override;
+ void OnHandleError(const Handle& handle, MojoResult result) override;
+
+ // Maps from assigned id to WatchData.
+ HandleToWatchDataMap handle_to_data_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
+};
+
+WatcherBackend::WatcherBackend() {
+}
+
+WatcherBackend::~WatcherBackend() {
+}
+
+void WatcherBackend::StartWatching(const WatchData& data) {
+ RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
+
+ DCHECK_EQ(0u, handle_to_data_.count(data.handle));
+
+ handle_to_data_[data.handle] = data;
+ MessagePumpMojo::current()->AddHandler(this, data.handle,
+ data.handle_signals,
+ data.deadline);
+}
+
+void WatcherBackend::StopWatching(WatcherID watcher_id) {
+ // Because of the thread hop it is entirely possible to get here and not
+ // have a valid handle registered for |watcher_id|.
+ Handle handle;
+ if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
+ handle_to_data_.erase(handle);
+ MessagePumpMojo::current()->RemoveHandler(handle);
+ }
+}
+
+void WatcherBackend::RemoveAndNotify(const Handle& handle,
+ MojoResult result) {
+ if (handle_to_data_.count(handle) == 0)
+ return;
+
+ const WatchData data(handle_to_data_[handle]);
+ handle_to_data_.erase(handle);
+ MessagePumpMojo::current()->RemoveHandler(handle);
+
+ data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result));
+}
+
+bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
+ Handle* handle) const {
+ for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
+ i != handle_to_data_.end(); ++i) {
+ if (i->second.id == watcher_id) {
+ *handle = i->second.handle;
+ return true;
+ }
+ }
+ return false;
+}
+
+void WatcherBackend::OnHandleReady(const Handle& handle) {
+ RemoveAndNotify(handle, MOJO_RESULT_OK);
+}
+
+void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
+ RemoveAndNotify(handle, result);
+}
+
+// WatcherThreadManager --------------------------------------------------------
+
+// WatcherThreadManager manages the background thread that listens for handles
+// to be ready. All requests are handled by WatcherBackend.
+} // namespace
+
+class WatcherThreadManager {
+ public:
+ ~WatcherThreadManager();
+
+ // Returns the shared instance.
+ static WatcherThreadManager* GetInstance();
+
+ // Starts watching the requested handle. Returns a unique ID that is used to
+ // stop watching the handle. When the handle is ready |callback| is notified
+ // on the thread StartWatching() was invoked on.
+ // This may be invoked on any thread.
+ WatcherID StartWatching(const Handle& handle,
+ MojoHandleSignals handle_signals,
+ base::TimeTicks deadline,
+ const base::Callback<void(MojoResult)>& callback);
+
+ // Stops watching a handle.
+ // This may be invoked on any thread.
+ void StopWatching(WatcherID watcher_id);
+
+ private:
+ enum RequestType {
+ REQUEST_START,
+ REQUEST_STOP,
+ };
+
+ // See description of |requests_| for details.
+ struct RequestData {
+ RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
+
+ RequestType type;
+ WatchData start_data;
+ WatcherID stop_id;
+ base::WaitableEvent* stop_event;
+ };
+
+ typedef std::vector<RequestData> Requests;
+
+ friend struct DefaultSingletonTraits<WatcherThreadManager>;
+
+ WatcherThreadManager();
+
+ // Schedules a request on the background thread. See |requests_| for details.
+ void AddRequest(const RequestData& data);
+
+ // Processes requests added to |requests_|. This is invoked on the backend
+ // thread.
+ void ProcessRequestsOnBackendThread();
+
+ base::Thread thread_;
+
+ base::AtomicSequenceNumber watcher_id_generator_;
+
+ WatcherBackend backend_;
+
+ // Protects |requests_|.
+ base::Lock lock_;
+
+ // Start/Stop result in adding a RequestData to |requests_| (protected by
+ // |lock_|). When the background thread wakes up it processes the requests.
+ Requests requests_;
+
+ DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
+};
+
+WatcherThreadManager::~WatcherThreadManager() {
+ thread_.Stop();
+}
+
+WatcherThreadManager* WatcherThreadManager::GetInstance() {
+ return Singleton<WatcherThreadManager>::get();
+}
+
+WatcherID WatcherThreadManager::StartWatching(
+ const Handle& handle,
+ MojoHandleSignals handle_signals,
+ base::TimeTicks deadline,
+ const base::Callback<void(MojoResult)>& callback) {
+ RequestData request_data;
+ request_data.type = REQUEST_START;
+ request_data.start_data.id = watcher_id_generator_.GetNext();
+ request_data.start_data.handle = handle;
+ request_data.start_data.callback = callback;
+ request_data.start_data.handle_signals = handle_signals;
+ request_data.start_data.deadline = deadline;
+ request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get();
+ AddRequest(request_data);
+ return request_data.start_data.id;
+}
+
+void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
+ // Handle the case of StartWatching() followed by StopWatching() before
+ // |thread_| woke up.
+ {
+ base::AutoLock auto_lock(lock_);
+ for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
+ if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
+ // Watcher ids are not reused, so if we find it we can stop.
+ requests_.erase(i);
+ return;
+ }
+ }
+ }
+
+ base::ThreadRestrictions::ScopedAllowWait allow_wait;
+ base::WaitableEvent event(true, false);
+ RequestData request_data;
+ request_data.type = REQUEST_STOP;
+ request_data.stop_id = watcher_id;
+ request_data.stop_event = &event;
+ AddRequest(request_data);
+
+ // We need to block until the handle is actually removed.
+ event.Wait();
+}
+
+void WatcherThreadManager::AddRequest(const RequestData& data) {
+ {
+ base::AutoLock auto_lock(lock_);
+ const bool was_empty = requests_.empty();
+ requests_.push_back(data);
+ if (!was_empty)
+ return;
+ }
+ // We own |thread_|, so it's safe to use Unretained() here.
+ thread_.task_runner()->PostTask(
+ FROM_HERE,
+ base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
+ base::Unretained(this)));
+}
+
+void WatcherThreadManager::ProcessRequestsOnBackendThread() {
+ DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
+
+ Requests requests;
+ {
+ base::AutoLock auto_lock(lock_);
+ requests_.swap(requests);
+ }
+ for (size_t i = 0; i < requests.size(); ++i) {
+ if (requests[i].type == REQUEST_START) {
+ backend_.StartWatching(requests[i].start_data);
+ } else {
+ backend_.StopWatching(requests[i].stop_id);
+ requests[i].stop_event->Signal();
+ }
+ }
+}
+
+WatcherThreadManager::WatcherThreadManager()
+ : thread_(kWatcherThreadName) {
+ base::Thread::Options thread_options;
+ thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
+ thread_.StartWithOptions(thread_options);
+}
+
+// HandleWatcher::StateBase and subclasses -------------------------------------
+
+// The base class of HandleWatcher's state. Owns the user's callback and
+// monitors the current thread's MessageLoop to know when to force the callback
+// to run (with an error) even though the pipe hasn't been signaled yet.
+class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
+ public:
+ StateBase(HandleWatcher* watcher,
+ const base::Callback<void(MojoResult)>& callback)
+ : watcher_(watcher),
+ callback_(callback),
+ got_ready_(false) {
+ base::MessageLoop::current()->AddDestructionObserver(this);
+ }
+
+ ~StateBase() override {
+ base::MessageLoop::current()->RemoveDestructionObserver(this);
+ }
+
+ protected:
+ void NotifyHandleReady(MojoResult result) {
+ got_ready_ = true;
+ NotifyAndDestroy(result);
+ }
+
+ bool got_ready() const { return got_ready_; }
+
+ private:
+ void WillDestroyCurrentMessageLoop() override {
+ // The current thread is exiting. Simulate a watch error.
+ NotifyAndDestroy(MOJO_RESULT_ABORTED);
+ }
+
+ void NotifyAndDestroy(MojoResult result) {
+ base::Callback<void(MojoResult)> callback = callback_;
+ watcher_->Stop(); // Destroys |this|.
+
+ callback.Run(result);
+ }
+
+ HandleWatcher* watcher_;
+ base::Callback<void(MojoResult)> callback_;
+
+ // Have we been notified that the handle is ready?
+ bool got_ready_;
+
+ DISALLOW_COPY_AND_ASSIGN(StateBase);
+};
+
+// If the thread on which HandleWatcher is used runs MessagePumpMojo,
+// SameThreadWatchingState is used to directly watch the handle on the same
+// thread.
+class HandleWatcher::SameThreadWatchingState : public StateBase,
+ public MessagePumpMojoHandler {
+ public:
+ SameThreadWatchingState(HandleWatcher* watcher,
+ const Handle& handle,
+ MojoHandleSignals handle_signals,
+ MojoDeadline deadline,
+ const base::Callback<void(MojoResult)>& callback)
+ : StateBase(watcher, callback),
+ handle_(handle) {
+ DCHECK(MessagePumpMojo::IsCurrent());
+
+ MessagePumpMojo::current()->AddHandler(
+ this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
+ }
+
+ ~SameThreadWatchingState() override {
+ if (!got_ready())
+ MessagePumpMojo::current()->RemoveHandler(handle_);
+ }
+
+ private:
+ // MessagePumpMojoHandler overrides:
+ void OnHandleReady(const Handle& handle) override {
+ StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
+ }
+
+ void OnHandleError(const Handle& handle, MojoResult result) override {
+ StopWatchingAndNotifyReady(handle, result);
+ }
+
+ void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
+ DCHECK_EQ(handle.value(), handle_.value());
+ MessagePumpMojo::current()->RemoveHandler(handle_);
+ NotifyHandleReady(result);
+ }
+
+ Handle handle_;
+
+ DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
+};
+
+// If the thread on which HandleWatcher is used runs a message pump different
+// from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
+// handle on the handle watcher thread.
+class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
+ public:
+ SecondaryThreadWatchingState(HandleWatcher* watcher,
+ const Handle& handle,
+ MojoHandleSignals handle_signals,
+ MojoDeadline deadline,
+ const base::Callback<void(MojoResult)>& callback)
+ : StateBase(watcher, callback),
+ weak_factory_(this) {
+ watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
+ handle,
+ handle_signals,
+ MojoDeadlineToTimeTicks(deadline),
+ base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ ~SecondaryThreadWatchingState() override {
+ // If we've been notified the handle is ready (|got_ready()| is true) then
+ // the watch has been implicitly removed by
+ // WatcherThreadManager/MessagePumpMojo and we don't have to call
+ // StopWatching(). To do so would needlessly entail posting a task and
+ // blocking until the background thread services it.
+ if (!got_ready())
+ WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
+ }
+
+ private:
+ WatcherID watcher_id_;
+
+ // Used to weakly bind |this| to the WatcherThreadManager.
+ base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
+};
+
+// HandleWatcher ---------------------------------------------------------------
+
+HandleWatcher::HandleWatcher() {
+}
+
+HandleWatcher::~HandleWatcher() {
+}
+
+void HandleWatcher::Start(const Handle& handle,
+ MojoHandleSignals handle_signals,
+ MojoDeadline deadline,
+ const base::Callback<void(MojoResult)>& callback) {
+ DCHECK(handle.is_valid());
+ DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
+
+ // Need to clear the state before creating a new one.
+ state_.reset();
+ if (MessagePumpMojo::IsCurrent()) {
+ state_.reset(new SameThreadWatchingState(
+ this, handle, handle_signals, deadline, callback));
+ } else {
+ state_.reset(new SecondaryThreadWatchingState(
+ this, handle, handle_signals, deadline, callback));
+ }
+}
+
+void HandleWatcher::Stop() {
+ state_.reset();
+}
+
+} // namespace common
+} // namespace mojo
« no previous file with comments | « mojo/common/handle_watcher.h ('k') | mojo/common/handle_watcher_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698