Chromium Code Reviews| Index: base/observer_list_threadsafe.h |
| diff --git a/base/observer_list_threadsafe.h b/base/observer_list_threadsafe.h |
| index afb1010b67c4f632c055e6afb8eef21c0addd04d..57e2786ac9cb5608f189ce002745b6d8c7418771 100644 |
| --- a/base/observer_list_threadsafe.h |
| +++ b/base/observer_list_threadsafe.h |
| @@ -5,52 +5,46 @@ |
| #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ |
| #define BASE_OBSERVER_LIST_THREADSAFE_H_ |
| -#include <algorithm> |
| -#include <map> |
| -#include <memory> |
| -#include <tuple> |
| +#include <unordered_map> |
| #include "base/bind.h" |
| #include "base/location.h" |
| #include "base/logging.h" |
| #include "base/macros.h" |
| -#include "base/memory/ptr_util.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/observer_list.h" |
| +#include "base/sequenced_task_runner.h" |
| +#include "base/stl_util.h" |
| +#include "base/synchronization/lock.h" |
| +#include "base/threading/sequenced_task_runner_handle.h" |
| +#include "base/threading/thread_local.h" |
| + |
| +// TODO(fdoray): Removing this include causes IWYU failures in other headers, |
| +// remove it in a follow- up CL. |
| #include "base/single_thread_task_runner.h" |
| -#include "base/threading/platform_thread.h" |
| -#include "base/threading/thread_task_runner_handle.h" |
| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // OVERVIEW: |
| // |
| -// A thread-safe container for a list of observers. |
| -// This is similar to the observer_list (see observer_list.h), but it |
| -// is more robust for multi-threaded situations. |
| +// A thread-safe container for a list of observers. This is similar to the |
| +// observer_list (see observer_list.h), but it is more robust for multi- |
| +// threaded situations. |
| // |
| // The following use cases are supported: |
| -// * Observers can register for notifications from any thread. |
| -// Callbacks to the observer will occur on the same thread where |
| -// the observer initially called AddObserver() from. |
| -// * Any thread may trigger a notification via Notify(). |
| -// * Observers can remove themselves from the observer list inside |
| -// of a callback. |
| -// * If one thread is notifying observers concurrently with an observer |
| -// removing itself from the observer list, the notifications will |
| -// be silently dropped. |
| +// * Observers can register for notifications from any sequence. They are |
| +// always notified on the sequence from which they were registered. |
| +// * Any sequence may trigger a notification via Notify(). |
| +// * Observers can remove themselves from the observer list inside of a |
| +// callback. |
| +// * If one sequence is notifying observers concurrently with an observer |
| +// removing itself from the observer list, the notifications will be |
| +// silently dropped. |
| // |
| -// The drawback of the threadsafe observer list is that notifications |
| -// are not as real-time as the non-threadsafe version of this class. |
| -// Notifications will always be done via PostTask() to another thread, |
| -// whereas with the non-thread-safe observer_list, notifications happen |
| -// synchronously and immediately. |
| -// |
| -// IMPLEMENTATION NOTES |
| -// The ObserverListThreadSafe maintains an ObserverList for each thread |
| -// which uses the ThreadSafeObserver. When Notifying the observers, |
| -// we simply call PostTask to each registered thread, and then each thread |
| -// will notify its regular ObserverList. |
| +// The drawback of the threadsafe observer list is that notifications are not |
| +// as real-time as the non-threadsafe version of this class. Notifications |
| +// will always be done via PostTask() to another sequence, whereas with the |
| +// non-thread-safe observer_list, notifications happen synchronously. |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| @@ -77,68 +71,64 @@ class ObserverListThreadSafe |
| using NotificationType = |
| typename ObserverList<ObserverType>::NotificationType; |
| - ObserverListThreadSafe() |
| - : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {} |
| + ObserverListThreadSafe() = default; |
| explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} |
| - // Add an observer to the list. An observer should not be added to |
| - // the same list more than once. |
| - void AddObserver(ObserverType* obs) { |
| - // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it, |
| - // so do not add the observer. |
| - if (!ThreadTaskRunnerHandle::IsSet()) |
| + // Adds |observer| to the list. |observer| must not already be in the list. |
| + void AddObserver(ObserverType* observer) { |
| + // TODO(fdoray): Change this to a DCHECK once all call sites have a |
| + // SequencedTaskRunnerHandle. |
| + if (!SequencedTaskRunnerHandle::IsSet()) |
| return; |
| - ObserverList<ObserverType>* list = nullptr; |
| - PlatformThreadId thread_id = PlatformThread::CurrentId(); |
| - { |
| - AutoLock lock(list_lock_); |
| - if (observer_lists_.find(thread_id) == observer_lists_.end()) { |
| - observer_lists_[thread_id] = |
| - base::MakeUnique<ObserverListContext>(type_); |
| + AutoLock auto_lock(lock_); |
| + |
| + // Add |observer| to the list of observers. |
| + DCHECK(!ContainsKey(observers_, observer)); |
| + const scoped_refptr<SequencedTaskRunner> task_runner = |
| + SequencedTaskRunnerHandle::Get(); |
| + observers_[observer] = task_runner; |
| + |
| + // If this is called while a notification is being dispatched on this thread |
| + // and |type_| is NOTIFY_ALL, |observer| must be notified. |
| + if (type_ == NotificationType::NOTIFY_ALL) { |
| + const NotificationData* current_notification = |
| + tls_current_notification_.Get(); |
| + if (current_notification) { |
| + task_runner->PostTask( |
| + current_notification->from_here, |
| + Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, |
| + observer, *current_notification)); |
| } |
| - list = &(observer_lists_[thread_id]->list); |
| } |
| - list->AddObserver(obs); |
| } |
| // Remove an observer from the list if it is in the list. |
| // If there are pending notifications in-transit to the observer, they will |
| // be aborted. |
| // If the observer to be removed is in the list, RemoveObserver MUST |
| - // be called from the same thread which called AddObserver. |
| - void RemoveObserver(ObserverType* obs) { |
| - PlatformThreadId thread_id = PlatformThread::CurrentId(); |
| - { |
| - AutoLock lock(list_lock_); |
| - auto it = observer_lists_.find(thread_id); |
| - if (it == observer_lists_.end()) { |
| - // This will happen if we try to remove an observer on a thread |
| - // we never added an observer for. |
| - return; |
| - } |
| - ObserverList<ObserverType>& list = it->second->list; |
| - |
| - list.RemoveObserver(obs); |
| - |
| - // If that was the last observer in the list, remove the ObserverList |
| - // entirely. |
| - if (list.size() == 0) |
| - observer_lists_.erase(it); |
| - } |
| + // be called from the same sequence which called AddObserver. |
| + void RemoveObserver(ObserverType* observer) { |
| + AutoLock auto_lock(lock_); |
| + auto it = observers_.find(observer); |
| + if (it == observers_.end()) |
| + return; |
| + DCHECK(it->second->RunsTasksOnCurrentThread()); |
| + observers_.erase(it); |
| } |
| // Verifies that the list is currently empty (i.e. there are no observers). |
| void AssertEmpty() const { |
| - AutoLock lock(list_lock_); |
| - DCHECK(observer_lists_.empty()); |
| +#if DCHECK_IS_ON() |
| + AutoLock auto_lock(lock_); |
| + DCHECK(observers_.empty()); |
| +#endif |
| } |
| - // Notify methods. |
| - // Make a thread-safe callback to each Observer in the list. |
| - // Note, these calls are effectively asynchronous. You cannot assume |
| - // that at the completion of the Notify call that all Observers have |
| - // been Notified. The notification may still be pending delivery. |
| + // Asynchronously invokes a callback on all observers, on their registration |
| + // sequence. You cannot assume that at the completion of the Notify call that |
| + // all Observers have been Notified. The notification may still be pending |
| + // delivery. |
| template <typename Method, typename... Params> |
| void Notify(const tracked_objects::Location& from_here, |
| Method m, Params&&... params) { |
| @@ -146,79 +136,67 @@ class ObserverListThreadSafe |
| Bind(&internal::Dispatcher<ObserverType, Method>::Run, |
| m, std::forward<Params>(params)...); |
| - AutoLock lock(list_lock_); |
| - for (const auto& entry : observer_lists_) { |
| - ObserverListContext* context = entry.second.get(); |
| - context->task_runner->PostTask( |
| + AutoLock lock(lock_); |
| + for (const auto& observer : observers_) { |
| + observer.second->PostTask( |
| from_here, |
| - Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, |
| - this, context, method)); |
| + Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, |
| + observer.first, NotificationData(from_here, method))); |
| } |
| } |
| private: |
| friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; |
| - struct ObserverListContext { |
| - explicit ObserverListContext(NotificationType type) |
| - : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {} |
| + struct NotificationData { |
| + NotificationData(const tracked_objects::Location& from_here_in, |
| + const Callback<void(ObserverType*)>& method_in) |
| + : from_here(from_here_in), method(method_in) {} |
| - scoped_refptr<SingleThreadTaskRunner> task_runner; |
| - ObserverList<ObserverType> list; |
| - |
| - private: |
| - DISALLOW_COPY_AND_ASSIGN(ObserverListContext); |
| + tracked_objects::Location from_here; |
| + Callback<void(ObserverType*)> method; |
| }; |
| - ~ObserverListThreadSafe() { |
| - } |
| + ~ObserverListThreadSafe() = default; |
| - // Wrapper which is called to fire the notifications for each thread's |
| - // ObserverList. This function MUST be called on the thread which owns |
| - // the unsafe ObserverList. |
| - void NotifyWrapper(ObserverListContext* context, |
| - const Callback<void(ObserverType*)>& method) { |
| - // Check that this list still needs notifications. |
| + void NotifyWrapper(ObserverType* observer, |
| + const NotificationData& notification) { |
| { |
| - AutoLock lock(list_lock_); |
| - auto it = observer_lists_.find(PlatformThread::CurrentId()); |
| - |
| - // The ObserverList could have been removed already. In fact, it could |
| - // have been removed and then re-added! If the master list's loop |
| - // does not match this one, then we do not need to finish this |
| - // notification. |
| - if (it == observer_lists_.end() || it->second.get() != context) |
| + AutoLock auto_lock(lock_); |
| + |
| + // Check whether the observer still needs a notification. |
| + auto it = observers_.find(observer); |
| + if (it == observers_.end()) |
| return; |
| + DCHECK(it->second->RunsTasksOnCurrentThread()); |
| } |
| - for (auto& observer : context->list) { |
| - method.Run(&observer); |
| - } |
| + // Keep track of the notification being dispatched on the current thread. |
| + // This will be used if the callback below calls AddObserver(). |
|
gab
2017/01/12 20:12:14
Expand comment to explain the specific use case (n
fdoray
2017/01/12 20:57:44
Done.
|
| + const NotificationData* const previous_notification = |
| + tls_current_notification_.Get(); |
| + tls_current_notification_.Set(¬ification); |
| - // If there are no more observers on the list, we can now delete it. |
| - if (context->list.size() == 0) { |
| - { |
| - AutoLock lock(list_lock_); |
| - // Remove |list| if it's not already removed. |
| - // This can happen if multiple observers got removed in a notification. |
| - // See http://crbug.com/55725. |
| - auto it = observer_lists_.find(PlatformThread::CurrentId()); |
| - if (it != observer_lists_.end() && it->second.get() == context) |
| - observer_lists_.erase(it); |
| - } |
| - } |
| + // Invoke the callback. |
| + notification.method.Run(observer); |
| + |
| + // Reset the notification being dispatched on the current thread to its |
| + // previous value. |
| + tls_current_notification_.Set(previous_notification); |
| } |
| - mutable Lock list_lock_; // Protects the observer_lists_. |
| + const NotificationType type_ = NotificationType::NOTIFY_ALL; |
| + |
| + // Synchronizes access to |observers_|. |
| + mutable Lock lock_; |
| - // Key by PlatformThreadId because in tests, clients can attempt to remove |
| - // observers without a SingleThreadTaskRunner. If this were keyed by |
| - // SingleThreadTaskRunner, that operation would be silently ignored, leaving |
| - // garbage in the ObserverList. |
| - std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>> |
| - observer_lists_; |
| + // Keys are observers. Values are the SequencedTaskRunners on which they must |
| + // be notified. |
| + std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>> |
| + observers_; |
| - const NotificationType type_; |
| + // Notification being dispatched on the current thread. |
| + ThreadLocalPointer<const NotificationData> tls_current_notification_; |
| DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); |
| }; |