| Index: base/observer_list_threadsafe.h
|
| diff --git a/base/observer_list_threadsafe.h b/base/observer_list_threadsafe.h
|
| index afb1010b67c4f632c055e6afb8eef21c0addd04d..9e6e347ea08b61c05581c0152be8fa13b6ee5fbf 100644
|
| --- a/base/observer_list_threadsafe.h
|
| +++ b/base/observer_list_threadsafe.h
|
| @@ -5,52 +5,49 @@
|
| #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
|
| #define BASE_OBSERVER_LIST_THREADSAFE_H_
|
|
|
| -#include <algorithm>
|
| -#include <map>
|
| -#include <memory>
|
| -#include <tuple>
|
| +#include <unordered_map>
|
|
|
| #include "base/bind.h"
|
| #include "base/location.h"
|
| #include "base/logging.h"
|
| #include "base/macros.h"
|
| -#include "base/memory/ptr_util.h"
|
| #include "base/memory/ref_counted.h"
|
| #include "base/observer_list.h"
|
| +#include "base/sequenced_task_runner.h"
|
| +#include "base/stl_util.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/threading/sequenced_task_runner_handle.h"
|
| +#include "base/threading/thread_local.h"
|
| +#include "build/build_config.h"
|
| +
|
| +// TODO(fdoray): Removing these includes causes IWYU failures in other headers,
|
| +// remove them in a follow- up CL.
|
| +#include "base/memory/ptr_util.h"
|
| #include "base/single_thread_task_runner.h"
|
| -#include "base/threading/platform_thread.h"
|
| #include "base/threading/thread_task_runner_handle.h"
|
|
|
| ///////////////////////////////////////////////////////////////////////////////
|
| //
|
| // OVERVIEW:
|
| //
|
| -// A thread-safe container for a list of observers.
|
| -// This is similar to the observer_list (see observer_list.h), but it
|
| -// is more robust for multi-threaded situations.
|
| +// A thread-safe container for a list of observers. This is similar to the
|
| +// observer_list (see observer_list.h), but it is more robust for multi-
|
| +// threaded situations.
|
| //
|
| // The following use cases are supported:
|
| -// * Observers can register for notifications from any thread.
|
| -// Callbacks to the observer will occur on the same thread where
|
| -// the observer initially called AddObserver() from.
|
| -// * Any thread may trigger a notification via Notify().
|
| -// * Observers can remove themselves from the observer list inside
|
| -// of a callback.
|
| -// * If one thread is notifying observers concurrently with an observer
|
| -// removing itself from the observer list, the notifications will
|
| -// be silently dropped.
|
| -//
|
| -// The drawback of the threadsafe observer list is that notifications
|
| -// are not as real-time as the non-threadsafe version of this class.
|
| -// Notifications will always be done via PostTask() to another thread,
|
| -// whereas with the non-thread-safe observer_list, notifications happen
|
| -// synchronously and immediately.
|
| +// * Observers can register for notifications from any sequence. They are
|
| +// always notified on the sequence from which they were registered.
|
| +// * Any sequence may trigger a notification via Notify().
|
| +// * Observers can remove themselves from the observer list inside of a
|
| +// callback.
|
| +// * If one sequence is notifying observers concurrently with an observer
|
| +// removing itself from the observer list, the notifications will be
|
| +// silently dropped.
|
| //
|
| -// IMPLEMENTATION NOTES
|
| -// The ObserverListThreadSafe maintains an ObserverList for each thread
|
| -// which uses the ThreadSafeObserver. When Notifying the observers,
|
| -// we simply call PostTask to each registered thread, and then each thread
|
| -// will notify its regular ObserverList.
|
| +// The drawback of the threadsafe observer list is that notifications are not
|
| +// as real-time as the non-threadsafe version of this class. Notifications
|
| +// will always be done via PostTask() to another sequence, whereas with the
|
| +// non-thread-safe observer_list, notifications happen synchronously.
|
| //
|
| ///////////////////////////////////////////////////////////////////////////////
|
|
|
| @@ -77,68 +74,63 @@ class ObserverListThreadSafe
|
| using NotificationType =
|
| typename ObserverList<ObserverType>::NotificationType;
|
|
|
| - ObserverListThreadSafe()
|
| - : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
|
| + ObserverListThreadSafe() = default;
|
| explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
|
|
|
| - // Add an observer to the list. An observer should not be added to
|
| - // the same list more than once.
|
| - void AddObserver(ObserverType* obs) {
|
| - // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
|
| - // so do not add the observer.
|
| - if (!ThreadTaskRunnerHandle::IsSet())
|
| + // Adds |observer| to the list. |observer| must not already be in the list.
|
| + void AddObserver(ObserverType* observer) {
|
| + // TODO(fdoray): Change this to a DCHECK once all call sites have a
|
| + // SequencedTaskRunnerHandle.
|
| + if (!SequencedTaskRunnerHandle::IsSet())
|
| return;
|
|
|
| - ObserverList<ObserverType>* list = nullptr;
|
| - PlatformThreadId thread_id = PlatformThread::CurrentId();
|
| - {
|
| - AutoLock lock(list_lock_);
|
| - if (observer_lists_.find(thread_id) == observer_lists_.end()) {
|
| - observer_lists_[thread_id] =
|
| - base::MakeUnique<ObserverListContext>(type_);
|
| + AutoLock auto_lock(lock_);
|
| +
|
| + // Add |observer| to the list of observers.
|
| + DCHECK(!ContainsKey(observers_, observer));
|
| + const scoped_refptr<SequencedTaskRunner> task_runner =
|
| + SequencedTaskRunnerHandle::Get();
|
| + observers_[observer] = task_runner;
|
| +
|
| + // If this is called while a notification is being dispatched on this thread
|
| + // and |type_| is NOTIFY_ALL, |observer| must be notified (if a notification
|
| + // is being dispatched on another thread in parallel, the notification may
|
| + // or may not make it to |observer| depending on the outcome of the race to
|
| + // |lock_|).
|
| + if (type_ == NotificationType::NOTIFY_ALL) {
|
| + const NotificationData* current_notification =
|
| + tls_current_notification_.Get();
|
| + if (current_notification) {
|
| + task_runner->PostTask(
|
| + current_notification->from_here,
|
| + Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
|
| + observer, *current_notification));
|
| }
|
| - list = &(observer_lists_[thread_id]->list);
|
| }
|
| - list->AddObserver(obs);
|
| }
|
|
|
| // Remove an observer from the list if it is in the list.
|
| - // If there are pending notifications in-transit to the observer, they will
|
| - // be aborted.
|
| - // If the observer to be removed is in the list, RemoveObserver MUST
|
| - // be called from the same thread which called AddObserver.
|
| - void RemoveObserver(ObserverType* obs) {
|
| - PlatformThreadId thread_id = PlatformThread::CurrentId();
|
| - {
|
| - AutoLock lock(list_lock_);
|
| - auto it = observer_lists_.find(thread_id);
|
| - if (it == observer_lists_.end()) {
|
| - // This will happen if we try to remove an observer on a thread
|
| - // we never added an observer for.
|
| - return;
|
| - }
|
| - ObserverList<ObserverType>& list = it->second->list;
|
| -
|
| - list.RemoveObserver(obs);
|
| -
|
| - // If that was the last observer in the list, remove the ObserverList
|
| - // entirely.
|
| - if (list.size() == 0)
|
| - observer_lists_.erase(it);
|
| - }
|
| + //
|
| + // If a notification was sent to the observer but hasn't started to run yet,
|
| + // it will be aborted. If a notification has started to run, removing the
|
| + // observer won't stop it.
|
| + void RemoveObserver(ObserverType* observer) {
|
| + AutoLock auto_lock(lock_);
|
| + observers_.erase(observer);
|
| }
|
|
|
| // Verifies that the list is currently empty (i.e. there are no observers).
|
| void AssertEmpty() const {
|
| - AutoLock lock(list_lock_);
|
| - DCHECK(observer_lists_.empty());
|
| +#if DCHECK_IS_ON()
|
| + AutoLock auto_lock(lock_);
|
| + DCHECK(observers_.empty());
|
| +#endif
|
| }
|
|
|
| - // Notify methods.
|
| - // Make a thread-safe callback to each Observer in the list.
|
| - // Note, these calls are effectively asynchronous. You cannot assume
|
| - // that at the completion of the Notify call that all Observers have
|
| - // been Notified. The notification may still be pending delivery.
|
| + // Asynchronously invokes a callback on all observers, on their registration
|
| + // sequence. You cannot assume that at the completion of the Notify call that
|
| + // all Observers have been Notified. The notification may still be pending
|
| + // delivery.
|
| template <typename Method, typename... Params>
|
| void Notify(const tracked_objects::Location& from_here,
|
| Method m, Params&&... params) {
|
| @@ -146,79 +138,71 @@ class ObserverListThreadSafe
|
| Bind(&internal::Dispatcher<ObserverType, Method>::Run,
|
| m, std::forward<Params>(params)...);
|
|
|
| - AutoLock lock(list_lock_);
|
| - for (const auto& entry : observer_lists_) {
|
| - ObserverListContext* context = entry.second.get();
|
| - context->task_runner->PostTask(
|
| + AutoLock lock(lock_);
|
| + for (const auto& observer : observers_) {
|
| + observer.second->PostTask(
|
| from_here,
|
| - Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
|
| - this, context, method));
|
| + Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
|
| + observer.first, NotificationData(from_here, method)));
|
| }
|
| }
|
|
|
| private:
|
| friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
|
|
|
| - struct ObserverListContext {
|
| - explicit ObserverListContext(NotificationType type)
|
| - : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
|
| -
|
| - scoped_refptr<SingleThreadTaskRunner> task_runner;
|
| - ObserverList<ObserverType> list;
|
| + struct NotificationData {
|
| + NotificationData(const tracked_objects::Location& from_here_in,
|
| + const Callback<void(ObserverType*)>& method_in)
|
| + : from_here(from_here_in), method(method_in) {}
|
|
|
| - private:
|
| - DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
|
| + tracked_objects::Location from_here;
|
| + Callback<void(ObserverType*)> method;
|
| };
|
|
|
| - ~ObserverListThreadSafe() {
|
| - }
|
| + ~ObserverListThreadSafe() = default;
|
|
|
| - // Wrapper which is called to fire the notifications for each thread's
|
| - // ObserverList. This function MUST be called on the thread which owns
|
| - // the unsafe ObserverList.
|
| - void NotifyWrapper(ObserverListContext* context,
|
| - const Callback<void(ObserverType*)>& method) {
|
| - // Check that this list still needs notifications.
|
| + void NotifyWrapper(ObserverType* observer,
|
| + const NotificationData& notification) {
|
| {
|
| - AutoLock lock(list_lock_);
|
| - auto it = observer_lists_.find(PlatformThread::CurrentId());
|
| -
|
| - // The ObserverList could have been removed already. In fact, it could
|
| - // have been removed and then re-added! If the master list's loop
|
| - // does not match this one, then we do not need to finish this
|
| - // notification.
|
| - if (it == observer_lists_.end() || it->second.get() != context)
|
| - return;
|
| - }
|
| + AutoLock auto_lock(lock_);
|
|
|
| - for (auto& observer : context->list) {
|
| - method.Run(&observer);
|
| + // Check whether the observer still needs a notification.
|
| + auto it = observers_.find(observer);
|
| + if (it == observers_.end())
|
| + return;
|
| + DCHECK(it->second->RunsTasksOnCurrentThread());
|
| }
|
|
|
| - // If there are no more observers on the list, we can now delete it.
|
| - if (context->list.size() == 0) {
|
| - {
|
| - AutoLock lock(list_lock_);
|
| - // Remove |list| if it's not already removed.
|
| - // This can happen if multiple observers got removed in a notification.
|
| - // See http://crbug.com/55725.
|
| - auto it = observer_lists_.find(PlatformThread::CurrentId());
|
| - if (it != observer_lists_.end() && it->second.get() == context)
|
| - observer_lists_.erase(it);
|
| - }
|
| - }
|
| + // Keep track of the notification being dispatched on the current thread.
|
| + // This will be used if the callback below calls AddObserver().
|
| + //
|
| + // Note: |tls_current_notification_| may not be nullptr if this runs in a
|
| + // nested loop started by a notification callback. In that case, it is
|
| + // important to save the previous value to restore it later.
|
| + const NotificationData* const previous_notification =
|
| + tls_current_notification_.Get();
|
| + tls_current_notification_.Set(¬ification);
|
| +
|
| + // Invoke the callback.
|
| + notification.method.Run(observer);
|
| +
|
| + // Reset the notification being dispatched on the current thread to its
|
| + // previous value.
|
| + tls_current_notification_.Set(previous_notification);
|
| }
|
|
|
| - mutable Lock list_lock_; // Protects the observer_lists_.
|
| + const NotificationType type_ = NotificationType::NOTIFY_ALL;
|
| +
|
| + // Synchronizes access to |observers_|.
|
| + mutable Lock lock_;
|
|
|
| - // Key by PlatformThreadId because in tests, clients can attempt to remove
|
| - // observers without a SingleThreadTaskRunner. If this were keyed by
|
| - // SingleThreadTaskRunner, that operation would be silently ignored, leaving
|
| - // garbage in the ObserverList.
|
| - std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
|
| - observer_lists_;
|
| + // Keys are observers. Values are the SequencedTaskRunners on which they must
|
| + // be notified.
|
| + std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
|
| + observers_;
|
|
|
| - const NotificationType type_;
|
| + // Notification being dispatched on the current thread.
|
| + ThreadLocalPointer<const NotificationData> tls_current_notification_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
|
| };
|
|
|