Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3465)

Unified Diff: base/observer_list_threadsafe.h

Issue 2669673002: Revert of Allow ObserverListThreadSafe to be used from sequenced tasks. (Closed)
Patch Set: Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/android/application_status_listener_unittest.cc ('k') | base/observer_list_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/observer_list_threadsafe.h
diff --git a/base/observer_list_threadsafe.h b/base/observer_list_threadsafe.h
index f8481de3e41f8c4d2eac9cb321824c5ad59cc579..afb1010b67c4f632c055e6afb8eef21c0addd04d 100644
--- a/base/observer_list_threadsafe.h
+++ b/base/observer_list_threadsafe.h
@@ -5,48 +5,52 @@
#ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
#define BASE_OBSERVER_LIST_THREADSAFE_H_
-#include <unordered_map>
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <tuple>
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
+#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/observer_list.h"
-#include "base/sequenced_task_runner.h"
-#include "base/stl_util.h"
-#include "base/synchronization/lock.h"
-#include "base/threading/sequenced_task_runner_handle.h"
-#include "base/threading/thread_local.h"
-#include "build/build_config.h"
-
-// TODO(fdoray): Removing these includes causes IWYU failures in other headers,
-// remove them in a follow- up CL.
-#include "base/memory/ptr_util.h"
#include "base/single_thread_task_runner.h"
+#include "base/threading/platform_thread.h"
+#include "base/threading/thread_task_runner_handle.h"
///////////////////////////////////////////////////////////////////////////////
//
// OVERVIEW:
//
-// A thread-safe container for a list of observers. This is similar to the
-// observer_list (see observer_list.h), but it is more robust for multi-
-// threaded situations.
+// A thread-safe container for a list of observers.
+// This is similar to the observer_list (see observer_list.h), but it
+// is more robust for multi-threaded situations.
//
// The following use cases are supported:
-// * Observers can register for notifications from any sequence. They are
-// always notified on the sequence from which they were registered.
-// * Any sequence may trigger a notification via Notify().
-// * Observers can remove themselves from the observer list inside of a
-// callback.
-// * If one sequence is notifying observers concurrently with an observer
-// removing itself from the observer list, the notifications will be
-// silently dropped.
-//
-// The drawback of the threadsafe observer list is that notifications are not
-// as real-time as the non-threadsafe version of this class. Notifications
-// will always be done via PostTask() to another sequence, whereas with the
-// non-thread-safe observer_list, notifications happen synchronously.
+// * Observers can register for notifications from any thread.
+// Callbacks to the observer will occur on the same thread where
+// the observer initially called AddObserver() from.
+// * Any thread may trigger a notification via Notify().
+// * Observers can remove themselves from the observer list inside
+// of a callback.
+// * If one thread is notifying observers concurrently with an observer
+// removing itself from the observer list, the notifications will
+// be silently dropped.
+//
+// The drawback of the threadsafe observer list is that notifications
+// are not as real-time as the non-threadsafe version of this class.
+// Notifications will always be done via PostTask() to another thread,
+// whereas with the non-thread-safe observer_list, notifications happen
+// synchronously and immediately.
+//
+// IMPLEMENTATION NOTES
+// The ObserverListThreadSafe maintains an ObserverList for each thread
+// which uses the ThreadSafeObserver. When Notifying the observers,
+// we simply call PostTask to each registered thread, and then each thread
+// will notify its regular ObserverList.
//
///////////////////////////////////////////////////////////////////////////////
@@ -73,72 +77,68 @@
using NotificationType =
typename ObserverList<ObserverType>::NotificationType;
- ObserverListThreadSafe() = default;
+ ObserverListThreadSafe()
+ : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
- // Adds |observer| to the list. |observer| must not already be in the list.
- void AddObserver(ObserverType* observer) {
- // TODO(fdoray): Change this to a DCHECK once all call sites have a
- // SequencedTaskRunnerHandle.
- if (!SequencedTaskRunnerHandle::IsSet())
+ // Add an observer to the list. An observer should not be added to
+ // the same list more than once.
+ void AddObserver(ObserverType* obs) {
+ // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
+ // so do not add the observer.
+ if (!ThreadTaskRunnerHandle::IsSet())
return;
- AutoLock auto_lock(lock_);
-
- // Add |observer| to the list of observers.
- DCHECK(!ContainsKey(observers_, observer));
- const scoped_refptr<SequencedTaskRunner> task_runner =
- SequencedTaskRunnerHandle::Get();
- observers_[observer] = task_runner;
-
- // If this is called while a notification is being dispatched on this thread
- // and |type_| is NOTIFY_ALL, |observer| must be notified (if a notification
- // is being dispatched on another thread in parallel, the notification may
- // or may not make it to |observer| depending on the outcome of the race to
- // |lock_|).
- if (type_ == NotificationType::NOTIFY_ALL) {
- const NotificationData* current_notification =
- tls_current_notification_.Get();
- if (current_notification) {
- task_runner->PostTask(
- current_notification->from_here,
- Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
- observer, *current_notification));
+ ObserverList<ObserverType>* list = nullptr;
+ PlatformThreadId thread_id = PlatformThread::CurrentId();
+ {
+ AutoLock lock(list_lock_);
+ if (observer_lists_.find(thread_id) == observer_lists_.end()) {
+ observer_lists_[thread_id] =
+ base::MakeUnique<ObserverListContext>(type_);
}
- }
+ list = &(observer_lists_[thread_id]->list);
+ }
+ list->AddObserver(obs);
}
// Remove an observer from the list if it is in the list.
// If there are pending notifications in-transit to the observer, they will
// be aborted.
// If the observer to be removed is in the list, RemoveObserver MUST
- // be called from the same sequence which called AddObserver.
- void RemoveObserver(ObserverType* observer) {
- AutoLock auto_lock(lock_);
- auto it = observers_.find(observer);
- if (it == observers_.end())
- return;
-
- // TODO(fdoray): Enable this on Android once all tests pass.
-#if !defined(OS_ANDROID)
- DCHECK(it->second->RunsTasksOnCurrentThread());
-#endif
-
- observers_.erase(it);
+ // be called from the same thread which called AddObserver.
+ void RemoveObserver(ObserverType* obs) {
+ PlatformThreadId thread_id = PlatformThread::CurrentId();
+ {
+ AutoLock lock(list_lock_);
+ auto it = observer_lists_.find(thread_id);
+ if (it == observer_lists_.end()) {
+ // This will happen if we try to remove an observer on a thread
+ // we never added an observer for.
+ return;
+ }
+ ObserverList<ObserverType>& list = it->second->list;
+
+ list.RemoveObserver(obs);
+
+ // If that was the last observer in the list, remove the ObserverList
+ // entirely.
+ if (list.size() == 0)
+ observer_lists_.erase(it);
+ }
}
// Verifies that the list is currently empty (i.e. there are no observers).
void AssertEmpty() const {
-#if DCHECK_IS_ON()
- AutoLock auto_lock(lock_);
- DCHECK(observers_.empty());
-#endif
- }
-
- // Asynchronously invokes a callback on all observers, on their registration
- // sequence. You cannot assume that at the completion of the Notify call that
- // all Observers have been Notified. The notification may still be pending
- // delivery.
+ AutoLock lock(list_lock_);
+ DCHECK(observer_lists_.empty());
+ }
+
+ // Notify methods.
+ // Make a thread-safe callback to each Observer in the list.
+ // Note, these calls are effectively asynchronous. You cannot assume
+ // that at the completion of the Notify call that all Observers have
+ // been Notified. The notification may still be pending delivery.
template <typename Method, typename... Params>
void Notify(const tracked_objects::Location& from_here,
Method m, Params&&... params) {
@@ -146,71 +146,79 @@
Bind(&internal::Dispatcher<ObserverType, Method>::Run,
m, std::forward<Params>(params)...);
- AutoLock lock(lock_);
- for (const auto& observer : observers_) {
- observer.second->PostTask(
+ AutoLock lock(list_lock_);
+ for (const auto& entry : observer_lists_) {
+ ObserverListContext* context = entry.second.get();
+ context->task_runner->PostTask(
from_here,
- Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
- observer.first, NotificationData(from_here, method)));
+ Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
+ this, context, method));
}
}
private:
friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
- struct NotificationData {
- NotificationData(const tracked_objects::Location& from_here_in,
- const Callback<void(ObserverType*)>& method_in)
- : from_here(from_here_in), method(method_in) {}
-
- tracked_objects::Location from_here;
- Callback<void(ObserverType*)> method;
+ struct ObserverListContext {
+ explicit ObserverListContext(NotificationType type)
+ : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
+
+ scoped_refptr<SingleThreadTaskRunner> task_runner;
+ ObserverList<ObserverType> list;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
};
- ~ObserverListThreadSafe() = default;
-
- void NotifyWrapper(ObserverType* observer,
- const NotificationData& notification) {
+ ~ObserverListThreadSafe() {
+ }
+
+ // Wrapper which is called to fire the notifications for each thread's
+ // ObserverList. This function MUST be called on the thread which owns
+ // the unsafe ObserverList.
+ void NotifyWrapper(ObserverListContext* context,
+ const Callback<void(ObserverType*)>& method) {
+ // Check that this list still needs notifications.
{
- AutoLock auto_lock(lock_);
-
- // Check whether the observer still needs a notification.
- auto it = observers_.find(observer);
- if (it == observers_.end())
+ AutoLock lock(list_lock_);
+ auto it = observer_lists_.find(PlatformThread::CurrentId());
+
+ // The ObserverList could have been removed already. In fact, it could
+ // have been removed and then re-added! If the master list's loop
+ // does not match this one, then we do not need to finish this
+ // notification.
+ if (it == observer_lists_.end() || it->second.get() != context)
return;
- DCHECK(it->second->RunsTasksOnCurrentThread());
- }
-
- // Keep track of the notification being dispatched on the current thread.
- // This will be used if the callback below calls AddObserver().
- //
- // Note: |tls_current_notification_| may not be nullptr if this runs in a
- // nested loop started by a notification callback. In that case, it is
- // important to save the previous value to restore it later.
- const NotificationData* const previous_notification =
- tls_current_notification_.Get();
- tls_current_notification_.Set(&notification);
-
- // Invoke the callback.
- notification.method.Run(observer);
-
- // Reset the notification being dispatched on the current thread to its
- // previous value.
- tls_current_notification_.Set(previous_notification);
- }
-
- const NotificationType type_ = NotificationType::NOTIFY_ALL;
-
- // Synchronizes access to |observers_|.
- mutable Lock lock_;
-
- // Keys are observers. Values are the SequencedTaskRunners on which they must
- // be notified.
- std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
- observers_;
-
- // Notification being dispatched on the current thread.
- ThreadLocalPointer<const NotificationData> tls_current_notification_;
+ }
+
+ for (auto& observer : context->list) {
+ method.Run(&observer);
+ }
+
+ // If there are no more observers on the list, we can now delete it.
+ if (context->list.size() == 0) {
+ {
+ AutoLock lock(list_lock_);
+ // Remove |list| if it's not already removed.
+ // This can happen if multiple observers got removed in a notification.
+ // See http://crbug.com/55725.
+ auto it = observer_lists_.find(PlatformThread::CurrentId());
+ if (it != observer_lists_.end() && it->second.get() == context)
+ observer_lists_.erase(it);
+ }
+ }
+ }
+
+ mutable Lock list_lock_; // Protects the observer_lists_.
+
+ // Key by PlatformThreadId because in tests, clients can attempt to remove
+ // observers without a SingleThreadTaskRunner. If this were keyed by
+ // SingleThreadTaskRunner, that operation would be silently ignored, leaving
+ // garbage in the ObserverList.
+ std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
+ observer_lists_;
+
+ const NotificationType type_;
DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
};
« no previous file with comments | « base/android/application_status_listener_unittest.cc ('k') | base/observer_list_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698