Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3333)

Unified Diff: base/observer_list_threadsafe.h

Issue 2592143003: Allow ObserverListThreadSafe to be used from sequenced tasks. (Closed)
Patch Set: rebase Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | base/observer_list_unittest.cc » ('j') | net/spdy/spdy_session_pool.cc » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/observer_list_threadsafe.h
diff --git a/base/observer_list_threadsafe.h b/base/observer_list_threadsafe.h
index afb1010b67c4f632c055e6afb8eef21c0addd04d..50708c9352e4ad8beffecdbbe8489e6c5bacfe2e 100644
--- a/base/observer_list_threadsafe.h
+++ b/base/observer_list_threadsafe.h
@@ -5,52 +5,47 @@
#ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
#define BASE_OBSERVER_LIST_THREADSAFE_H_
-#include <algorithm>
-#include <map>
-#include <memory>
-#include <tuple>
+#include <unordered_map>
+#include <utility>
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
-#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/observer_list.h"
+#include "base/sequenced_task_runner.h"
+// TODO(fdoray): Remove this include (it isn't used in this file but other files
+// depend on its presence here).
gab 2017/01/10 21:14:49 Move it out of regular include block with comment
fdoray 2017/01/11 14:09:06 Done.
#include "base/single_thread_task_runner.h"
+#include "base/stl_util.h"
+#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"
-#include "base/threading/thread_task_runner_handle.h"
+#include "base/threading/sequenced_task_runner_handle.h"
///////////////////////////////////////////////////////////////////////////////
//
// OVERVIEW:
//
-// A thread-safe container for a list of observers.
-// This is similar to the observer_list (see observer_list.h), but it
-// is more robust for multi-threaded situations.
+// A thread-safe container for a list of observers. This is similar to the
+// observer_list (see observer_list.h), but it is more robust for multi-
+// threaded situations.
//
// The following use cases are supported:
-// * Observers can register for notifications from any thread.
-// Callbacks to the observer will occur on the same thread where
-// the observer initially called AddObserver() from.
-// * Any thread may trigger a notification via Notify().
-// * Observers can remove themselves from the observer list inside
-// of a callback.
-// * If one thread is notifying observers concurrently with an observer
-// removing itself from the observer list, the notifications will
-// be silently dropped.
+// * Observers can register for notifications from any sequence. They are
+// always notified on the sequence from which they were registered.
+// * Any sequence may trigger a notification via Notify().
+// * Observers can remove themselves from the observer list inside of a
+// callback.
+// * If one sequence is notifying observers concurrently with an observer
+// removing itself from the observer list, the notifications will be
+// silently dropped.
//
-// The drawback of the threadsafe observer list is that notifications
-// are not as real-time as the non-threadsafe version of this class.
-// Notifications will always be done via PostTask() to another thread,
-// whereas with the non-thread-safe observer_list, notifications happen
-// synchronously and immediately.
-//
-// IMPLEMENTATION NOTES
-// The ObserverListThreadSafe maintains an ObserverList for each thread
-// which uses the ThreadSafeObserver. When Notifying the observers,
-// we simply call PostTask to each registered thread, and then each thread
-// will notify its regular ObserverList.
+// The drawback of the threadsafe observer list is that notifications are not
+// as real-time as the non-threadsafe version of this class. Notifications
+// will always be done via PostTask() to another sequence, whereas with the
+// non-thread-safe observer_list, notifications happen synchronously and
+// immediately.
//
///////////////////////////////////////////////////////////////////////////////
@@ -77,68 +72,61 @@ class ObserverListThreadSafe
using NotificationType =
typename ObserverList<ObserverType>::NotificationType;
- ObserverListThreadSafe()
- : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
+ ObserverListThreadSafe() = default;
explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
- // Add an observer to the list. An observer should not be added to
- // the same list more than once.
- void AddObserver(ObserverType* obs) {
- // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
- // so do not add the observer.
- if (!ThreadTaskRunnerHandle::IsSet())
+ // Adds |observer| to the list. |observer| must not already be in the list.
+ void AddObserver(ObserverType* observer) {
+ if (!SequencedTaskRunnerHandle::IsSet())
return;
- ObserverList<ObserverType>* list = nullptr;
- PlatformThreadId thread_id = PlatformThread::CurrentId();
- {
- AutoLock lock(list_lock_);
- if (observer_lists_.find(thread_id) == observer_lists_.end()) {
- observer_lists_[thread_id] =
- base::MakeUnique<ObserverListContext>(type_);
+ AutoLock auto_lock(lock_);
+
+ // Add |observer| to the list of observers.
+ DCHECK(!ContainsKey(observers_, observer));
+ const scoped_refptr<SequencedTaskRunner> task_runner =
+ SequencedTaskRunnerHandle::Get();
+ observers_[observer] = task_runner;
+
+ // If this is called while a notification is being dispatched and |type_| is
+ // NOTIFY_ALL, |observer| must be notified.
gab 2017/01/10 21:14:49 Hmmm, see comment on |current_notifications_| but
fdoray 2017/01/11 14:09:06 https://cs.chromium.org/chromium/src/base/observer
+ if (type_ == NotificationType::NOTIFY_ALL) {
+ auto it = current_notifications_.find(PlatformThread::CurrentId());
+ if (it != current_notifications_.end()) {
+ task_runner->PostNonNestableTask(
+ it->second.from_here,
+ Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
+ observer, it->second.from_here, it->second.method));
}
- list = &(observer_lists_[thread_id]->list);
}
- list->AddObserver(obs);
}
// Remove an observer from the list if it is in the list.
// If there are pending notifications in-transit to the observer, they will
// be aborted.
// If the observer to be removed is in the list, RemoveObserver MUST
- // be called from the same thread which called AddObserver.
- void RemoveObserver(ObserverType* obs) {
- PlatformThreadId thread_id = PlatformThread::CurrentId();
- {
- AutoLock lock(list_lock_);
- auto it = observer_lists_.find(thread_id);
- if (it == observer_lists_.end()) {
- // This will happen if we try to remove an observer on a thread
- // we never added an observer for.
- return;
- }
- ObserverList<ObserverType>& list = it->second->list;
-
- list.RemoveObserver(obs);
-
- // If that was the last observer in the list, remove the ObserverList
- // entirely.
- if (list.size() == 0)
- observer_lists_.erase(it);
- }
+ // be called from the same sequence which called AddObserver.
+ void RemoveObserver(ObserverType* observer) {
+ AutoLock auto_lock(lock_);
+ auto it = observers_.find(observer);
+ if (it == observers_.end())
+ return;
+ DCHECK(it->second->RunsTasksOnCurrentThread());
+ observers_.erase(it);
}
// Verifies that the list is currently empty (i.e. there are no observers).
void AssertEmpty() const {
- AutoLock lock(list_lock_);
- DCHECK(observer_lists_.empty());
+#if DCHECK_IS_ON()
+ AutoLock auto_lock(lock_);
+ DCHECK(observers_.empty());
+#endif
}
- // Notify methods.
- // Make a thread-safe callback to each Observer in the list.
- // Note, these calls are effectively asynchronous. You cannot assume
- // that at the completion of the Notify call that all Observers have
- // been Notified. The notification may still be pending delivery.
+ // Asynchronously invokes a callback on all observers, on their registration
+ // sequence. You cannot assume that at the completion of the Notify call that
+ // all Observers have been Notified. The notification may still be pending
+ // delivery.
template <typename Method, typename... Params>
void Notify(const tracked_objects::Location& from_here,
Method m, Params&&... params) {
@@ -146,79 +134,72 @@ class ObserverListThreadSafe
Bind(&internal::Dispatcher<ObserverType, Method>::Run,
m, std::forward<Params>(params)...);
- AutoLock lock(list_lock_);
- for (const auto& entry : observer_lists_) {
- ObserverListContext* context = entry.second.get();
- context->task_runner->PostTask(
- from_here,
- Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
- this, context, method));
+ AutoLock lock(lock_);
+ for (const auto& observer : observers_) {
+ observer.second->PostNonNestableTask(
gab 2017/01/10 21:14:49 Why add non-nestable?
fdoray 2017/01/11 14:09:06 When |type_| is NOTIFY_ALL, an observer added duri
gab 2017/01/12 16:24:05 Notify merely posts anyways so I don't think you c
fdoray 2017/01/12 17:17:57 You can RunLoop().RunUntilIdle() from a notificati
gab 2017/01/12 20:12:14 Ewww, hadn't understood your use case for PostNonN
+ from_here, Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
+ this, observer.first, from_here, method));
}
}
private:
friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
- struct ObserverListContext {
- explicit ObserverListContext(NotificationType type)
- : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
-
- scoped_refptr<SingleThreadTaskRunner> task_runner;
- ObserverList<ObserverType> list;
+ struct NotificationData {
+ NotificationData(const tracked_objects::Location& from_here_in,
+ const Callback<void(ObserverType*)>& method_in)
+ : from_here(from_here_in), method(method_in) {}
- private:
- DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
+ tracked_objects::Location from_here;
+ Callback<void(ObserverType*)> method;
};
- ~ObserverListThreadSafe() {
- }
+ ~ObserverListThreadSafe() = default;
- // Wrapper which is called to fire the notifications for each thread's
- // ObserverList. This function MUST be called on the thread which owns
- // the unsafe ObserverList.
- void NotifyWrapper(ObserverListContext* context,
+ void NotifyWrapper(ObserverType* observer,
+ const tracked_objects::Location& from_here,
const Callback<void(ObserverType*)>& method) {
- // Check that this list still needs notifications.
+ const auto thread_id = PlatformThread::CurrentId();
+
{
- AutoLock lock(list_lock_);
- auto it = observer_lists_.find(PlatformThread::CurrentId());
-
- // The ObserverList could have been removed already. In fact, it could
- // have been removed and then re-added! If the master list's loop
- // does not match this one, then we do not need to finish this
- // notification.
- if (it == observer_lists_.end() || it->second.get() != context)
+ AutoLock auto_lock(lock_);
+
+ // Check whether the observer still needs a notification.
+ auto it = observers_.find(observer);
+ if (it == observers_.end())
return;
- }
+ DCHECK(it->second->RunsTasksOnCurrentThread());
- for (auto& observer : context->list) {
- method.Run(&observer);
+ // Keep track of the notification being dispatched on the current thread.
+ DCHECK(!ContainsKey(current_notifications_, thread_id));
+ current_notifications_.insert(
+ std::make_pair(thread_id, NotificationData(from_here, method)));
}
- // If there are no more observers on the list, we can now delete it.
- if (context->list.size() == 0) {
- {
- AutoLock lock(list_lock_);
- // Remove |list| if it's not already removed.
- // This can happen if multiple observers got removed in a notification.
- // See http://crbug.com/55725.
- auto it = observer_lists_.find(PlatformThread::CurrentId());
- if (it != observer_lists_.end() && it->second.get() == context)
- observer_lists_.erase(it);
- }
+ method.Run(observer);
+
+ {
+ AutoLock auto_lock(lock_);
+
+ // The current thread is done dispatching the notification.
+ const size_t num_removed = current_notifications_.erase(thread_id);
+ DCHECK_EQ(static_cast<size_t>(1), num_removed);
}
}
- mutable Lock list_lock_; // Protects the observer_lists_.
+ const NotificationType type_ = NotificationType::NOTIFY_ALL;
+
+ // Synchronizes access to all members.
+ mutable Lock lock_;
- // Key by PlatformThreadId because in tests, clients can attempt to remove
- // observers without a SingleThreadTaskRunner. If this were keyed by
- // SingleThreadTaskRunner, that operation would be silently ignored, leaving
- // garbage in the ObserverList.
- std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
- observer_lists_;
+ // Keys are observers. Values are the SequencedTaskRunners on which they must
+ // be notified.
+ std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
+ observers_;
- const NotificationType type_;
+ // Keys are ids of threads on which notifications are currently being
+ // dispatched. Values contain data about the notifications.
+ std::unordered_map<PlatformThreadId, NotificationData> current_notifications_;
gab 2017/01/10 21:14:49 Seems like this should use TLS? (and as such not r
fdoray 2017/01/11 14:09:06 TLS works. Done.
DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
};
« no previous file with comments | « no previous file | base/observer_list_unittest.cc » ('j') | net/spdy/spdy_session_pool.cc » ('J')

Powered by Google App Engine
This is Rietveld 408576698