Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ | 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ |
| 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ | 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ |
| 7 | 7 |
| 8 #include <algorithm> | 8 #include <unordered_map> |
| 9 #include <map> | |
| 10 #include <memory> | |
| 11 #include <tuple> | |
| 12 | 9 |
| 13 #include "base/bind.h" | 10 #include "base/bind.h" |
| 14 #include "base/location.h" | 11 #include "base/location.h" |
| 15 #include "base/logging.h" | 12 #include "base/logging.h" |
| 16 #include "base/macros.h" | 13 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" | |
| 18 #include "base/memory/ref_counted.h" | 14 #include "base/memory/ref_counted.h" |
| 19 #include "base/observer_list.h" | 15 #include "base/observer_list.h" |
| 16 #include "base/sequenced_task_runner.h" | |
| 17 #include "base/stl_util.h" | |
| 18 #include "base/synchronization/lock.h" | |
| 19 #include "base/threading/sequenced_task_runner_handle.h" | |
| 20 #include "base/threading/thread_local.h" | |
| 21 | |
| 22 // TODO(fdoray): Removing this include causes IWYU failures in other headers, | |
| 23 // remove it in a follow- up CL. | |
| 20 #include "base/single_thread_task_runner.h" | 24 #include "base/single_thread_task_runner.h" |
| 21 #include "base/threading/platform_thread.h" | |
| 22 #include "base/threading/thread_task_runner_handle.h" | |
| 23 | 25 |
| 24 /////////////////////////////////////////////////////////////////////////////// | 26 /////////////////////////////////////////////////////////////////////////////// |
| 25 // | 27 // |
| 26 // OVERVIEW: | 28 // OVERVIEW: |
| 27 // | 29 // |
| 28 // A thread-safe container for a list of observers. | 30 // A thread-safe container for a list of observers. This is similar to the |
| 29 // This is similar to the observer_list (see observer_list.h), but it | 31 // observer_list (see observer_list.h), but it is more robust for multi- |
| 30 // is more robust for multi-threaded situations. | 32 // threaded situations. |
| 31 // | 33 // |
| 32 // The following use cases are supported: | 34 // The following use cases are supported: |
| 33 // * Observers can register for notifications from any thread. | 35 // * Observers can register for notifications from any sequence. They are |
| 34 // Callbacks to the observer will occur on the same thread where | 36 // always notified on the sequence from which they were registered. |
| 35 // the observer initially called AddObserver() from. | 37 // * Any sequence may trigger a notification via Notify(). |
| 36 // * Any thread may trigger a notification via Notify(). | 38 // * Observers can remove themselves from the observer list inside of a |
| 37 // * Observers can remove themselves from the observer list inside | 39 // callback. |
| 38 // of a callback. | 40 // * If one sequence is notifying observers concurrently with an observer |
| 39 // * If one thread is notifying observers concurrently with an observer | 41 // removing itself from the observer list, the notifications will be |
| 40 // removing itself from the observer list, the notifications will | 42 // silently dropped. |
| 41 // be silently dropped. | |
| 42 // | 43 // |
| 43 // The drawback of the threadsafe observer list is that notifications | 44 // The drawback of the threadsafe observer list is that notifications are not |
| 44 // are not as real-time as the non-threadsafe version of this class. | 45 // as real-time as the non-threadsafe version of this class. Notifications |
| 45 // Notifications will always be done via PostTask() to another thread, | 46 // will always be done via PostTask() to another sequence, whereas with the |
| 46 // whereas with the non-thread-safe observer_list, notifications happen | 47 // non-thread-safe observer_list, notifications happen synchronously and |
| 47 // synchronously and immediately. | 48 // immediately. |
|
gab
2017/01/12 16:24:05
- "and immediately" ? (synchronously implies immed
fdoray
2017/01/12 17:17:57
Done.
| |
| 48 // | |
| 49 // IMPLEMENTATION NOTES | |
| 50 // The ObserverListThreadSafe maintains an ObserverList for each thread | |
| 51 // which uses the ThreadSafeObserver. When Notifying the observers, | |
| 52 // we simply call PostTask to each registered thread, and then each thread | |
| 53 // will notify its regular ObserverList. | |
| 54 // | 49 // |
| 55 /////////////////////////////////////////////////////////////////////////////// | 50 /////////////////////////////////////////////////////////////////////////////// |
| 56 | 51 |
| 57 namespace base { | 52 namespace base { |
| 58 namespace internal { | 53 namespace internal { |
| 59 | 54 |
| 60 template <typename ObserverType, typename Method> | 55 template <typename ObserverType, typename Method> |
| 61 struct Dispatcher; | 56 struct Dispatcher; |
| 62 | 57 |
| 63 template <typename ObserverType, typename ReceiverType, typename... Params> | 58 template <typename ObserverType, typename ReceiverType, typename... Params> |
| 64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { | 59 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { |
| 65 static void Run(void(ReceiverType::* m)(Params...), | 60 static void Run(void(ReceiverType::* m)(Params...), |
| 66 Params... params, ObserverType* obj) { | 61 Params... params, ObserverType* obj) { |
| 67 (obj->*m)(std::forward<Params>(params)...); | 62 (obj->*m)(std::forward<Params>(params)...); |
| 68 } | 63 } |
| 69 }; | 64 }; |
| 70 | 65 |
| 71 } // namespace internal | 66 } // namespace internal |
| 72 | 67 |
| 73 template <class ObserverType> | 68 template <class ObserverType> |
| 74 class ObserverListThreadSafe | 69 class ObserverListThreadSafe |
| 75 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { | 70 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { |
| 76 public: | 71 public: |
| 77 using NotificationType = | 72 using NotificationType = |
| 78 typename ObserverList<ObserverType>::NotificationType; | 73 typename ObserverList<ObserverType>::NotificationType; |
| 79 | 74 |
| 80 ObserverListThreadSafe() | 75 ObserverListThreadSafe() = default; |
| 81 : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {} | |
| 82 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} | 76 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} |
| 83 | 77 |
| 84 // Add an observer to the list. An observer should not be added to | 78 // Adds |observer| to the list. |observer| must not already be in the list. |
| 85 // the same list more than once. | 79 void AddObserver(ObserverType* observer) { |
| 86 void AddObserver(ObserverType* obs) { | 80 if (!SequencedTaskRunnerHandle::IsSet()) |
|
gab
2017/01/12 16:24:06
Seems this should be a DCHECK instead of handling
fdoray
2017/01/12 17:17:57
Will do this in a separate CL since it breaks test
| |
| 87 // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it, | |
| 88 // so do not add the observer. | |
| 89 if (!ThreadTaskRunnerHandle::IsSet()) | |
| 90 return; | 81 return; |
| 91 | 82 |
| 92 ObserverList<ObserverType>* list = nullptr; | 83 AutoLock auto_lock(lock_); |
| 93 PlatformThreadId thread_id = PlatformThread::CurrentId(); | 84 |
| 94 { | 85 // Add |observer| to the list of observers. |
| 95 AutoLock lock(list_lock_); | 86 DCHECK(!ContainsKey(observers_, observer)); |
| 96 if (observer_lists_.find(thread_id) == observer_lists_.end()) { | 87 const scoped_refptr<SequencedTaskRunner> task_runner = |
| 97 observer_lists_[thread_id] = | 88 SequencedTaskRunnerHandle::Get(); |
| 98 base::MakeUnique<ObserverListContext>(type_); | 89 observers_[observer] = task_runner; |
| 90 | |
| 91 // If this is called while a notification is being dispatched and |type_| is | |
|
gab
2017/01/12 16:24:06
s/being dispatched/being dispatched on this thread
fdoray
2017/01/12 17:17:57
Done.
| |
| 92 // NOTIFY_ALL, |observer| must be notified. | |
|
gab
2017/01/12 16:24:06
+ "(if a notification is being dispatched on anoth
fdoray
2017/01/12 17:17:57
Not sure this is true.
Thread / Action
1 / C
gab
2017/01/12 20:12:14
But you also have:
1 / Calls Notify()
2
fdoray
2017/01/12 20:57:44
Done.
| |
| 93 if (type_ == NotificationType::NOTIFY_ALL) { | |
| 94 const NotificationData* current_notification = | |
| 95 tls_current_notification_.Get(); | |
| 96 if (current_notification) { | |
| 97 task_runner->PostTask( | |
| 98 current_notification->from_here, | |
| 99 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, | |
| 100 observer, *current_notification)); | |
| 99 } | 101 } |
| 100 list = &(observer_lists_[thread_id]->list); | |
| 101 } | 102 } |
| 102 list->AddObserver(obs); | |
| 103 } | 103 } |
| 104 | 104 |
| 105 // Remove an observer from the list if it is in the list. | 105 // Remove an observer from the list if it is in the list. |
| 106 // If there are pending notifications in-transit to the observer, they will | 106 // If there are pending notifications in-transit to the observer, they will |
| 107 // be aborted. | 107 // be aborted. |
| 108 // If the observer to be removed is in the list, RemoveObserver MUST | 108 // If the observer to be removed is in the list, RemoveObserver MUST |
| 109 // be called from the same thread which called AddObserver. | 109 // be called from the same sequence which called AddObserver. |
| 110 void RemoveObserver(ObserverType* obs) { | 110 void RemoveObserver(ObserverType* observer) { |
| 111 PlatformThreadId thread_id = PlatformThread::CurrentId(); | 111 AutoLock auto_lock(lock_); |
| 112 { | 112 auto it = observers_.find(observer); |
| 113 AutoLock lock(list_lock_); | 113 if (it == observers_.end()) |
| 114 auto it = observer_lists_.find(thread_id); | 114 return; |
| 115 if (it == observer_lists_.end()) { | 115 DCHECK(it->second->RunsTasksOnCurrentThread()); |
| 116 // This will happen if we try to remove an observer on a thread | 116 observers_.erase(it); |
| 117 // we never added an observer for. | |
| 118 return; | |
| 119 } | |
| 120 ObserverList<ObserverType>& list = it->second->list; | |
| 121 | |
| 122 list.RemoveObserver(obs); | |
| 123 | |
| 124 // If that was the last observer in the list, remove the ObserverList | |
| 125 // entirely. | |
| 126 if (list.size() == 0) | |
| 127 observer_lists_.erase(it); | |
| 128 } | |
| 129 } | 117 } |
| 130 | 118 |
| 131 // Verifies that the list is currently empty (i.e. there are no observers). | 119 // Verifies that the list is currently empty (i.e. there are no observers). |
| 132 void AssertEmpty() const { | 120 void AssertEmpty() const { |
| 133 AutoLock lock(list_lock_); | 121 #if DCHECK_IS_ON() |
| 134 DCHECK(observer_lists_.empty()); | 122 AutoLock auto_lock(lock_); |
| 123 DCHECK(observers_.empty()); | |
| 124 #endif | |
| 135 } | 125 } |
| 136 | 126 |
| 137 // Notify methods. | 127 // Asynchronously invokes a callback on all observers, on their registration |
| 138 // Make a thread-safe callback to each Observer in the list. | 128 // sequence. You cannot assume that at the completion of the Notify call that |
| 139 // Note, these calls are effectively asynchronous. You cannot assume | 129 // all Observers have been Notified. The notification may still be pending |
| 140 // that at the completion of the Notify call that all Observers have | 130 // delivery. |
| 141 // been Notified. The notification may still be pending delivery. | |
| 142 template <typename Method, typename... Params> | 131 template <typename Method, typename... Params> |
| 143 void Notify(const tracked_objects::Location& from_here, | 132 void Notify(const tracked_objects::Location& from_here, |
| 144 Method m, Params&&... params) { | 133 Method m, Params&&... params) { |
| 145 Callback<void(ObserverType*)> method = | 134 Callback<void(ObserverType*)> method = |
| 146 Bind(&internal::Dispatcher<ObserverType, Method>::Run, | 135 Bind(&internal::Dispatcher<ObserverType, Method>::Run, |
| 147 m, std::forward<Params>(params)...); | 136 m, std::forward<Params>(params)...); |
| 148 | 137 |
| 149 AutoLock lock(list_lock_); | 138 AutoLock lock(lock_); |
| 150 for (const auto& entry : observer_lists_) { | 139 for (const auto& observer : observers_) { |
| 151 ObserverListContext* context = entry.second.get(); | 140 observer.second->PostTask( |
| 152 context->task_runner->PostTask( | |
| 153 from_here, | 141 from_here, |
| 154 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, | 142 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, |
| 155 this, context, method)); | 143 observer.first, NotificationData(from_here, method))); |
| 156 } | 144 } |
| 157 } | 145 } |
| 158 | 146 |
| 159 private: | 147 private: |
| 160 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; | 148 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; |
| 161 | 149 |
| 162 struct ObserverListContext { | 150 struct NotificationData { |
| 163 explicit ObserverListContext(NotificationType type) | 151 NotificationData(const tracked_objects::Location& from_here_in, |
| 164 : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {} | 152 const Callback<void(ObserverType*)>& method_in) |
| 153 : from_here(from_here_in), method(method_in) {} | |
| 165 | 154 |
| 166 scoped_refptr<SingleThreadTaskRunner> task_runner; | 155 tracked_objects::Location from_here; |
| 167 ObserverList<ObserverType> list; | 156 Callback<void(ObserverType*)> method; |
| 168 | |
| 169 private: | |
| 170 DISALLOW_COPY_AND_ASSIGN(ObserverListContext); | |
| 171 }; | 157 }; |
| 172 | 158 |
| 173 ~ObserverListThreadSafe() { | 159 ~ObserverListThreadSafe() = default; |
| 160 | |
| 161 void NotifyWrapper(ObserverType* observer, | |
| 162 const NotificationData& notification) { | |
| 163 { | |
| 164 AutoLock auto_lock(lock_); | |
| 165 | |
| 166 // Check whether the observer still needs a notification. | |
| 167 auto it = observers_.find(observer); | |
| 168 if (it == observers_.end()) | |
| 169 return; | |
| 170 DCHECK(it->second->RunsTasksOnCurrentThread()); | |
| 171 } | |
| 172 | |
| 173 // Keep track of the notification being dispatched on the current thread. | |
| 174 // This will be used if the callback below calls AddObserver(). | |
| 175 const NotificationData* const previous_notification = | |
| 176 tls_current_notification_.Get(); | |
|
gab
2017/01/12 16:24:06
As mentioned in another comment, I don't think nes
fdoray
2017/01/12 17:17:57
See previous comment in this file (a notification
| |
| 177 tls_current_notification_.Set(¬ification); | |
| 178 | |
| 179 // Invoke the callback. | |
| 180 notification.method.Run(observer); | |
| 181 | |
| 182 // Reset the notification being dispatched on the current thread to its | |
| 183 // previous value. | |
| 184 tls_current_notification_.Set(previous_notification); | |
| 174 } | 185 } |
| 175 | 186 |
| 176 // Wrapper which is called to fire the notifications for each thread's | 187 const NotificationType type_ = NotificationType::NOTIFY_ALL; |
| 177 // ObserverList. This function MUST be called on the thread which owns | |
| 178 // the unsafe ObserverList. | |
| 179 void NotifyWrapper(ObserverListContext* context, | |
| 180 const Callback<void(ObserverType*)>& method) { | |
| 181 // Check that this list still needs notifications. | |
| 182 { | |
| 183 AutoLock lock(list_lock_); | |
| 184 auto it = observer_lists_.find(PlatformThread::CurrentId()); | |
| 185 | 188 |
| 186 // The ObserverList could have been removed already. In fact, it could | 189 // Synchronizes access to |observers_|. |
| 187 // have been removed and then re-added! If the master list's loop | 190 mutable Lock lock_; |
| 188 // does not match this one, then we do not need to finish this | |
| 189 // notification. | |
| 190 if (it == observer_lists_.end() || it->second.get() != context) | |
| 191 return; | |
| 192 } | |
| 193 | 191 |
| 194 for (auto& observer : context->list) { | 192 // Keys are observers. Values are the SequencedTaskRunners on which they must |
| 195 method.Run(&observer); | 193 // be notified. |
| 196 } | 194 std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>> |
| 195 observers_; | |
| 197 | 196 |
| 198 // If there are no more observers on the list, we can now delete it. | 197 // Notification being dispatched on the current thread. |
| 199 if (context->list.size() == 0) { | 198 ThreadLocalPointer<const NotificationData> tls_current_notification_; |
| 200 { | |
| 201 AutoLock lock(list_lock_); | |
| 202 // Remove |list| if it's not already removed. | |
| 203 // This can happen if multiple observers got removed in a notification. | |
| 204 // See http://crbug.com/55725. | |
| 205 auto it = observer_lists_.find(PlatformThread::CurrentId()); | |
| 206 if (it != observer_lists_.end() && it->second.get() == context) | |
| 207 observer_lists_.erase(it); | |
| 208 } | |
| 209 } | |
| 210 } | |
| 211 | |
| 212 mutable Lock list_lock_; // Protects the observer_lists_. | |
| 213 | |
| 214 // Key by PlatformThreadId because in tests, clients can attempt to remove | |
| 215 // observers without a SingleThreadTaskRunner. If this were keyed by | |
| 216 // SingleThreadTaskRunner, that operation would be silently ignored, leaving | |
| 217 // garbage in the ObserverList. | |
| 218 std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>> | |
| 219 observer_lists_; | |
| 220 | |
| 221 const NotificationType type_; | |
| 222 | 199 |
| 223 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); | 200 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); |
| 224 }; | 201 }; |
| 225 | 202 |
| 226 } // namespace base | 203 } // namespace base |
| 227 | 204 |
| 228 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ | 205 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ |
| OLD | NEW |