Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Unified Diff: mojo/message_pump/handle_watcher.cc

Issue 1280463003: Revert of Straightens outs DEPS in mojo/common (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/message_pump/handle_watcher.h ('k') | mojo/message_pump/handle_watcher_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/message_pump/handle_watcher.cc
diff --git a/mojo/message_pump/handle_watcher.cc b/mojo/message_pump/handle_watcher.cc
deleted file mode 100644
index d02c761109692b257b18d94332c814b9974a87c3..0000000000000000000000000000000000000000
--- a/mojo/message_pump/handle_watcher.cc
+++ /dev/null
@@ -1,475 +0,0 @@
-// Copyright 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "mojo/message_pump/handle_watcher.h"
-
-#include <map>
-
-#include "base/atomic_sequence_num.h"
-#include "base/bind.h"
-#include "base/lazy_instance.h"
-#include "base/logging.h"
-#include "base/macros.h"
-#include "base/memory/singleton.h"
-#include "base/memory/weak_ptr.h"
-#include "base/message_loop/message_loop.h"
-#include "base/single_thread_task_runner.h"
-#include "base/synchronization/lock.h"
-#include "base/synchronization/waitable_event.h"
-#include "base/thread_task_runner_handle.h"
-#include "base/threading/thread.h"
-#include "base/threading/thread_restrictions.h"
-#include "base/time/time.h"
-#include "mojo/message_pump/message_pump_mojo.h"
-#include "mojo/message_pump/message_pump_mojo_handler.h"
-#include "mojo/message_pump/time_helper.h"
-
-namespace mojo {
-namespace common {
-
-typedef int WatcherID;
-
-namespace {
-
-const char kWatcherThreadName[] = "handle-watcher-thread";
-
-base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
- return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
- internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
-}
-
-// Tracks the data for a single call to Start().
-struct WatchData {
- WatchData()
- : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {}
-
- WatcherID id;
- Handle handle;
- MojoHandleSignals handle_signals;
- base::TimeTicks deadline;
- base::Callback<void(MojoResult)> callback;
- scoped_refptr<base::SingleThreadTaskRunner> task_runner;
-};
-
-// WatcherBackend --------------------------------------------------------------
-
-// WatcherBackend is responsible for managing the requests and interacting with
-// MessagePumpMojo. All access (outside of creation/destruction) is done on the
-// thread WatcherThreadManager creates.
-class WatcherBackend : public MessagePumpMojoHandler {
- public:
- WatcherBackend();
- ~WatcherBackend() override;
-
- void StartWatching(const WatchData& data);
-
- // Cancels a previously scheduled request to start a watch.
- void StopWatching(WatcherID watcher_id);
-
- private:
- typedef std::map<Handle, WatchData> HandleToWatchDataMap;
-
- // Invoked when a handle needs to be removed and notified.
- void RemoveAndNotify(const Handle& handle, MojoResult result);
-
- // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
- // and sets |handle| to the Handle. Returns false if not a known id.
- bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
-
- // MessagePumpMojoHandler overrides:
- void OnHandleReady(const Handle& handle) override;
- void OnHandleError(const Handle& handle, MojoResult result) override;
-
- // Maps from assigned id to WatchData.
- HandleToWatchDataMap handle_to_data_;
-
- DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
-};
-
-WatcherBackend::WatcherBackend() {
-}
-
-WatcherBackend::~WatcherBackend() {
-}
-
-void WatcherBackend::StartWatching(const WatchData& data) {
- RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
-
- DCHECK_EQ(0u, handle_to_data_.count(data.handle));
-
- handle_to_data_[data.handle] = data;
- MessagePumpMojo::current()->AddHandler(this, data.handle,
- data.handle_signals,
- data.deadline);
-}
-
-void WatcherBackend::StopWatching(WatcherID watcher_id) {
- // Because of the thread hop it is entirely possible to get here and not
- // have a valid handle registered for |watcher_id|.
- Handle handle;
- if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
- handle_to_data_.erase(handle);
- MessagePumpMojo::current()->RemoveHandler(handle);
- }
-}
-
-void WatcherBackend::RemoveAndNotify(const Handle& handle,
- MojoResult result) {
- if (handle_to_data_.count(handle) == 0)
- return;
-
- const WatchData data(handle_to_data_[handle]);
- handle_to_data_.erase(handle);
- MessagePumpMojo::current()->RemoveHandler(handle);
-
- data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result));
-}
-
-bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
- Handle* handle) const {
- for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
- i != handle_to_data_.end(); ++i) {
- if (i->second.id == watcher_id) {
- *handle = i->second.handle;
- return true;
- }
- }
- return false;
-}
-
-void WatcherBackend::OnHandleReady(const Handle& handle) {
- RemoveAndNotify(handle, MOJO_RESULT_OK);
-}
-
-void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
- RemoveAndNotify(handle, result);
-}
-
-// WatcherThreadManager --------------------------------------------------------
-
-// WatcherThreadManager manages the background thread that listens for handles
-// to be ready. All requests are handled by WatcherBackend.
-} // namespace
-
-class WatcherThreadManager {
- public:
- ~WatcherThreadManager();
-
- // Returns the shared instance.
- static WatcherThreadManager* GetInstance();
-
- // Starts watching the requested handle. Returns a unique ID that is used to
- // stop watching the handle. When the handle is ready |callback| is notified
- // on the thread StartWatching() was invoked on.
- // This may be invoked on any thread.
- WatcherID StartWatching(const Handle& handle,
- MojoHandleSignals handle_signals,
- base::TimeTicks deadline,
- const base::Callback<void(MojoResult)>& callback);
-
- // Stops watching a handle.
- // This may be invoked on any thread.
- void StopWatching(WatcherID watcher_id);
-
- private:
- enum RequestType {
- REQUEST_START,
- REQUEST_STOP,
- };
-
- // See description of |requests_| for details.
- struct RequestData {
- RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
-
- RequestType type;
- WatchData start_data;
- WatcherID stop_id;
- base::WaitableEvent* stop_event;
- };
-
- typedef std::vector<RequestData> Requests;
-
- friend struct DefaultSingletonTraits<WatcherThreadManager>;
-
- WatcherThreadManager();
-
- // Schedules a request on the background thread. See |requests_| for details.
- void AddRequest(const RequestData& data);
-
- // Processes requests added to |requests_|. This is invoked on the backend
- // thread.
- void ProcessRequestsOnBackendThread();
-
- base::Thread thread_;
-
- base::AtomicSequenceNumber watcher_id_generator_;
-
- WatcherBackend backend_;
-
- // Protects |requests_|.
- base::Lock lock_;
-
- // Start/Stop result in adding a RequestData to |requests_| (protected by
- // |lock_|). When the background thread wakes up it processes the requests.
- Requests requests_;
-
- DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
-};
-
-WatcherThreadManager::~WatcherThreadManager() {
- thread_.Stop();
-}
-
-WatcherThreadManager* WatcherThreadManager::GetInstance() {
- return Singleton<WatcherThreadManager>::get();
-}
-
-WatcherID WatcherThreadManager::StartWatching(
- const Handle& handle,
- MojoHandleSignals handle_signals,
- base::TimeTicks deadline,
- const base::Callback<void(MojoResult)>& callback) {
- RequestData request_data;
- request_data.type = REQUEST_START;
- request_data.start_data.id = watcher_id_generator_.GetNext();
- request_data.start_data.handle = handle;
- request_data.start_data.callback = callback;
- request_data.start_data.handle_signals = handle_signals;
- request_data.start_data.deadline = deadline;
- request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get();
- AddRequest(request_data);
- return request_data.start_data.id;
-}
-
-void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
- // Handle the case of StartWatching() followed by StopWatching() before
- // |thread_| woke up.
- {
- base::AutoLock auto_lock(lock_);
- for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
- if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
- // Watcher ids are not reused, so if we find it we can stop.
- requests_.erase(i);
- return;
- }
- }
- }
-
- base::ThreadRestrictions::ScopedAllowWait allow_wait;
- base::WaitableEvent event(true, false);
- RequestData request_data;
- request_data.type = REQUEST_STOP;
- request_data.stop_id = watcher_id;
- request_data.stop_event = &event;
- AddRequest(request_data);
-
- // We need to block until the handle is actually removed.
- event.Wait();
-}
-
-void WatcherThreadManager::AddRequest(const RequestData& data) {
- {
- base::AutoLock auto_lock(lock_);
- const bool was_empty = requests_.empty();
- requests_.push_back(data);
- if (!was_empty)
- return;
- }
- // We own |thread_|, so it's safe to use Unretained() here.
- thread_.task_runner()->PostTask(
- FROM_HERE,
- base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
- base::Unretained(this)));
-}
-
-void WatcherThreadManager::ProcessRequestsOnBackendThread() {
- DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
-
- Requests requests;
- {
- base::AutoLock auto_lock(lock_);
- requests_.swap(requests);
- }
- for (size_t i = 0; i < requests.size(); ++i) {
- if (requests[i].type == REQUEST_START) {
- backend_.StartWatching(requests[i].start_data);
- } else {
- backend_.StopWatching(requests[i].stop_id);
- requests[i].stop_event->Signal();
- }
- }
-}
-
-WatcherThreadManager::WatcherThreadManager()
- : thread_(kWatcherThreadName) {
- base::Thread::Options thread_options;
- thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
- thread_.StartWithOptions(thread_options);
-}
-
-// HandleWatcher::StateBase and subclasses -------------------------------------
-
-// The base class of HandleWatcher's state. Owns the user's callback and
-// monitors the current thread's MessageLoop to know when to force the callback
-// to run (with an error) even though the pipe hasn't been signaled yet.
-class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
- public:
- StateBase(HandleWatcher* watcher,
- const base::Callback<void(MojoResult)>& callback)
- : watcher_(watcher),
- callback_(callback),
- got_ready_(false) {
- base::MessageLoop::current()->AddDestructionObserver(this);
- }
-
- ~StateBase() override {
- base::MessageLoop::current()->RemoveDestructionObserver(this);
- }
-
- protected:
- void NotifyHandleReady(MojoResult result) {
- got_ready_ = true;
- NotifyAndDestroy(result);
- }
-
- bool got_ready() const { return got_ready_; }
-
- private:
- void WillDestroyCurrentMessageLoop() override {
- // The current thread is exiting. Simulate a watch error.
- NotifyAndDestroy(MOJO_RESULT_ABORTED);
- }
-
- void NotifyAndDestroy(MojoResult result) {
- base::Callback<void(MojoResult)> callback = callback_;
- watcher_->Stop(); // Destroys |this|.
-
- callback.Run(result);
- }
-
- HandleWatcher* watcher_;
- base::Callback<void(MojoResult)> callback_;
-
- // Have we been notified that the handle is ready?
- bool got_ready_;
-
- DISALLOW_COPY_AND_ASSIGN(StateBase);
-};
-
-// If the thread on which HandleWatcher is used runs MessagePumpMojo,
-// SameThreadWatchingState is used to directly watch the handle on the same
-// thread.
-class HandleWatcher::SameThreadWatchingState : public StateBase,
- public MessagePumpMojoHandler {
- public:
- SameThreadWatchingState(HandleWatcher* watcher,
- const Handle& handle,
- MojoHandleSignals handle_signals,
- MojoDeadline deadline,
- const base::Callback<void(MojoResult)>& callback)
- : StateBase(watcher, callback),
- handle_(handle) {
- DCHECK(MessagePumpMojo::IsCurrent());
-
- MessagePumpMojo::current()->AddHandler(
- this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
- }
-
- ~SameThreadWatchingState() override {
- if (!got_ready())
- MessagePumpMojo::current()->RemoveHandler(handle_);
- }
-
- private:
- // MessagePumpMojoHandler overrides:
- void OnHandleReady(const Handle& handle) override {
- StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
- }
-
- void OnHandleError(const Handle& handle, MojoResult result) override {
- StopWatchingAndNotifyReady(handle, result);
- }
-
- void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
- DCHECK_EQ(handle.value(), handle_.value());
- MessagePumpMojo::current()->RemoveHandler(handle_);
- NotifyHandleReady(result);
- }
-
- Handle handle_;
-
- DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
-};
-
-// If the thread on which HandleWatcher is used runs a message pump different
-// from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
-// handle on the handle watcher thread.
-class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
- public:
- SecondaryThreadWatchingState(HandleWatcher* watcher,
- const Handle& handle,
- MojoHandleSignals handle_signals,
- MojoDeadline deadline,
- const base::Callback<void(MojoResult)>& callback)
- : StateBase(watcher, callback),
- weak_factory_(this) {
- watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
- handle,
- handle_signals,
- MojoDeadlineToTimeTicks(deadline),
- base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
- weak_factory_.GetWeakPtr()));
- }
-
- ~SecondaryThreadWatchingState() override {
- // If we've been notified the handle is ready (|got_ready()| is true) then
- // the watch has been implicitly removed by
- // WatcherThreadManager/MessagePumpMojo and we don't have to call
- // StopWatching(). To do so would needlessly entail posting a task and
- // blocking until the background thread services it.
- if (!got_ready())
- WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
- }
-
- private:
- WatcherID watcher_id_;
-
- // Used to weakly bind |this| to the WatcherThreadManager.
- base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
-
- DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
-};
-
-// HandleWatcher ---------------------------------------------------------------
-
-HandleWatcher::HandleWatcher() {
-}
-
-HandleWatcher::~HandleWatcher() {
-}
-
-void HandleWatcher::Start(const Handle& handle,
- MojoHandleSignals handle_signals,
- MojoDeadline deadline,
- const base::Callback<void(MojoResult)>& callback) {
- DCHECK(handle.is_valid());
- DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
-
- // Need to clear the state before creating a new one.
- state_.reset();
- if (MessagePumpMojo::IsCurrent()) {
- state_.reset(new SameThreadWatchingState(
- this, handle, handle_signals, deadline, callback));
- } else {
- state_.reset(new SecondaryThreadWatchingState(
- this, handle, handle_signals, deadline, callback));
- }
-}
-
-void HandleWatcher::Stop() {
- state_.reset();
-}
-
-} // namespace common
-} // namespace mojo
« no previous file with comments | « mojo/message_pump/handle_watcher.h ('k') | mojo/message_pump/handle_watcher_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698