OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ | 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ |
6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ | 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ |
7 | 7 |
8 #include <unordered_map> | 8 #include <algorithm> |
| 9 #include <map> |
| 10 #include <memory> |
| 11 #include <tuple> |
9 | 12 |
10 #include "base/bind.h" | 13 #include "base/bind.h" |
11 #include "base/location.h" | 14 #include "base/location.h" |
12 #include "base/logging.h" | 15 #include "base/logging.h" |
13 #include "base/macros.h" | 16 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" |
14 #include "base/memory/ref_counted.h" | 18 #include "base/memory/ref_counted.h" |
15 #include "base/observer_list.h" | 19 #include "base/observer_list.h" |
16 #include "base/sequenced_task_runner.h" | |
17 #include "base/stl_util.h" | |
18 #include "base/synchronization/lock.h" | |
19 #include "base/threading/sequenced_task_runner_handle.h" | |
20 #include "base/threading/thread_local.h" | |
21 #include "build/build_config.h" | |
22 | |
23 // TODO(fdoray): Removing these includes causes IWYU failures in other headers, | |
24 // remove them in a follow- up CL. | |
25 #include "base/memory/ptr_util.h" | |
26 #include "base/single_thread_task_runner.h" | 20 #include "base/single_thread_task_runner.h" |
| 21 #include "base/threading/platform_thread.h" |
| 22 #include "base/threading/thread_task_runner_handle.h" |
27 | 23 |
28 /////////////////////////////////////////////////////////////////////////////// | 24 /////////////////////////////////////////////////////////////////////////////// |
29 // | 25 // |
30 // OVERVIEW: | 26 // OVERVIEW: |
31 // | 27 // |
32 // A thread-safe container for a list of observers. This is similar to the | 28 // A thread-safe container for a list of observers. |
33 // observer_list (see observer_list.h), but it is more robust for multi- | 29 // This is similar to the observer_list (see observer_list.h), but it |
34 // threaded situations. | 30 // is more robust for multi-threaded situations. |
35 // | 31 // |
36 // The following use cases are supported: | 32 // The following use cases are supported: |
37 // * Observers can register for notifications from any sequence. They are | 33 // * Observers can register for notifications from any thread. |
38 // always notified on the sequence from which they were registered. | 34 // Callbacks to the observer will occur on the same thread where |
39 // * Any sequence may trigger a notification via Notify(). | 35 // the observer initially called AddObserver() from. |
40 // * Observers can remove themselves from the observer list inside of a | 36 // * Any thread may trigger a notification via Notify(). |
41 // callback. | 37 // * Observers can remove themselves from the observer list inside |
42 // * If one sequence is notifying observers concurrently with an observer | 38 // of a callback. |
43 // removing itself from the observer list, the notifications will be | 39 // * If one thread is notifying observers concurrently with an observer |
44 // silently dropped. | 40 // removing itself from the observer list, the notifications will |
| 41 // be silently dropped. |
45 // | 42 // |
46 // The drawback of the threadsafe observer list is that notifications are not | 43 // The drawback of the threadsafe observer list is that notifications |
47 // as real-time as the non-threadsafe version of this class. Notifications | 44 // are not as real-time as the non-threadsafe version of this class. |
48 // will always be done via PostTask() to another sequence, whereas with the | 45 // Notifications will always be done via PostTask() to another thread, |
49 // non-thread-safe observer_list, notifications happen synchronously. | 46 // whereas with the non-thread-safe observer_list, notifications happen |
| 47 // synchronously and immediately. |
| 48 // |
| 49 // IMPLEMENTATION NOTES |
| 50 // The ObserverListThreadSafe maintains an ObserverList for each thread |
| 51 // which uses the ThreadSafeObserver. When Notifying the observers, |
| 52 // we simply call PostTask to each registered thread, and then each thread |
| 53 // will notify its regular ObserverList. |
50 // | 54 // |
51 /////////////////////////////////////////////////////////////////////////////// | 55 /////////////////////////////////////////////////////////////////////////////// |
52 | 56 |
53 namespace base { | 57 namespace base { |
54 namespace internal { | 58 namespace internal { |
55 | 59 |
56 template <typename ObserverType, typename Method> | 60 template <typename ObserverType, typename Method> |
57 struct Dispatcher; | 61 struct Dispatcher; |
58 | 62 |
59 template <typename ObserverType, typename ReceiverType, typename... Params> | 63 template <typename ObserverType, typename ReceiverType, typename... Params> |
60 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { | 64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { |
61 static void Run(void(ReceiverType::* m)(Params...), | 65 static void Run(void(ReceiverType::* m)(Params...), |
62 Params... params, ObserverType* obj) { | 66 Params... params, ObserverType* obj) { |
63 (obj->*m)(std::forward<Params>(params)...); | 67 (obj->*m)(std::forward<Params>(params)...); |
64 } | 68 } |
65 }; | 69 }; |
66 | 70 |
67 } // namespace internal | 71 } // namespace internal |
68 | 72 |
69 template <class ObserverType> | 73 template <class ObserverType> |
70 class ObserverListThreadSafe | 74 class ObserverListThreadSafe |
71 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { | 75 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { |
72 public: | 76 public: |
73 using NotificationType = | 77 using NotificationType = |
74 typename ObserverList<ObserverType>::NotificationType; | 78 typename ObserverList<ObserverType>::NotificationType; |
75 | 79 |
76 ObserverListThreadSafe() = default; | 80 ObserverListThreadSafe() |
| 81 : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {} |
77 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} | 82 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} |
78 | 83 |
79 // Adds |observer| to the list. |observer| must not already be in the list. | 84 // Add an observer to the list. An observer should not be added to |
80 void AddObserver(ObserverType* observer) { | 85 // the same list more than once. |
81 // TODO(fdoray): Change this to a DCHECK once all call sites have a | 86 void AddObserver(ObserverType* obs) { |
82 // SequencedTaskRunnerHandle. | 87 // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it, |
83 if (!SequencedTaskRunnerHandle::IsSet()) | 88 // so do not add the observer. |
| 89 if (!ThreadTaskRunnerHandle::IsSet()) |
84 return; | 90 return; |
85 | 91 |
86 AutoLock auto_lock(lock_); | 92 ObserverList<ObserverType>* list = nullptr; |
87 | 93 PlatformThreadId thread_id = PlatformThread::CurrentId(); |
88 // Add |observer| to the list of observers. | 94 { |
89 DCHECK(!ContainsKey(observers_, observer)); | 95 AutoLock lock(list_lock_); |
90 const scoped_refptr<SequencedTaskRunner> task_runner = | 96 if (observer_lists_.find(thread_id) == observer_lists_.end()) { |
91 SequencedTaskRunnerHandle::Get(); | 97 observer_lists_[thread_id] = |
92 observers_[observer] = task_runner; | 98 base::MakeUnique<ObserverListContext>(type_); |
93 | |
94 // If this is called while a notification is being dispatched on this thread | |
95 // and |type_| is NOTIFY_ALL, |observer| must be notified (if a notification | |
96 // is being dispatched on another thread in parallel, the notification may | |
97 // or may not make it to |observer| depending on the outcome of the race to | |
98 // |lock_|). | |
99 if (type_ == NotificationType::NOTIFY_ALL) { | |
100 const NotificationData* current_notification = | |
101 tls_current_notification_.Get(); | |
102 if (current_notification) { | |
103 task_runner->PostTask( | |
104 current_notification->from_here, | |
105 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, | |
106 observer, *current_notification)); | |
107 } | 99 } |
| 100 list = &(observer_lists_[thread_id]->list); |
108 } | 101 } |
| 102 list->AddObserver(obs); |
109 } | 103 } |
110 | 104 |
111 // Remove an observer from the list if it is in the list. | 105 // Remove an observer from the list if it is in the list. |
112 // If there are pending notifications in-transit to the observer, they will | 106 // If there are pending notifications in-transit to the observer, they will |
113 // be aborted. | 107 // be aborted. |
114 // If the observer to be removed is in the list, RemoveObserver MUST | 108 // If the observer to be removed is in the list, RemoveObserver MUST |
115 // be called from the same sequence which called AddObserver. | 109 // be called from the same thread which called AddObserver. |
116 void RemoveObserver(ObserverType* observer) { | 110 void RemoveObserver(ObserverType* obs) { |
117 AutoLock auto_lock(lock_); | 111 PlatformThreadId thread_id = PlatformThread::CurrentId(); |
118 auto it = observers_.find(observer); | 112 { |
119 if (it == observers_.end()) | 113 AutoLock lock(list_lock_); |
120 return; | 114 auto it = observer_lists_.find(thread_id); |
| 115 if (it == observer_lists_.end()) { |
| 116 // This will happen if we try to remove an observer on a thread |
| 117 // we never added an observer for. |
| 118 return; |
| 119 } |
| 120 ObserverList<ObserverType>& list = it->second->list; |
121 | 121 |
122 // TODO(fdoray): Enable this on Android once all tests pass. | 122 list.RemoveObserver(obs); |
123 #if !defined(OS_ANDROID) | |
124 DCHECK(it->second->RunsTasksOnCurrentThread()); | |
125 #endif | |
126 | 123 |
127 observers_.erase(it); | 124 // If that was the last observer in the list, remove the ObserverList |
| 125 // entirely. |
| 126 if (list.size() == 0) |
| 127 observer_lists_.erase(it); |
| 128 } |
128 } | 129 } |
129 | 130 |
130 // Verifies that the list is currently empty (i.e. there are no observers). | 131 // Verifies that the list is currently empty (i.e. there are no observers). |
131 void AssertEmpty() const { | 132 void AssertEmpty() const { |
132 #if DCHECK_IS_ON() | 133 AutoLock lock(list_lock_); |
133 AutoLock auto_lock(lock_); | 134 DCHECK(observer_lists_.empty()); |
134 DCHECK(observers_.empty()); | |
135 #endif | |
136 } | 135 } |
137 | 136 |
138 // Asynchronously invokes a callback on all observers, on their registration | 137 // Notify methods. |
139 // sequence. You cannot assume that at the completion of the Notify call that | 138 // Make a thread-safe callback to each Observer in the list. |
140 // all Observers have been Notified. The notification may still be pending | 139 // Note, these calls are effectively asynchronous. You cannot assume |
141 // delivery. | 140 // that at the completion of the Notify call that all Observers have |
| 141 // been Notified. The notification may still be pending delivery. |
142 template <typename Method, typename... Params> | 142 template <typename Method, typename... Params> |
143 void Notify(const tracked_objects::Location& from_here, | 143 void Notify(const tracked_objects::Location& from_here, |
144 Method m, Params&&... params) { | 144 Method m, Params&&... params) { |
145 Callback<void(ObserverType*)> method = | 145 Callback<void(ObserverType*)> method = |
146 Bind(&internal::Dispatcher<ObserverType, Method>::Run, | 146 Bind(&internal::Dispatcher<ObserverType, Method>::Run, |
147 m, std::forward<Params>(params)...); | 147 m, std::forward<Params>(params)...); |
148 | 148 |
149 AutoLock lock(lock_); | 149 AutoLock lock(list_lock_); |
150 for (const auto& observer : observers_) { | 150 for (const auto& entry : observer_lists_) { |
151 observer.second->PostTask( | 151 ObserverListContext* context = entry.second.get(); |
| 152 context->task_runner->PostTask( |
152 from_here, | 153 from_here, |
153 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, | 154 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, |
154 observer.first, NotificationData(from_here, method))); | 155 this, context, method)); |
155 } | 156 } |
156 } | 157 } |
157 | 158 |
158 private: | 159 private: |
159 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; | 160 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; |
160 | 161 |
161 struct NotificationData { | 162 struct ObserverListContext { |
162 NotificationData(const tracked_objects::Location& from_here_in, | 163 explicit ObserverListContext(NotificationType type) |
163 const Callback<void(ObserverType*)>& method_in) | 164 : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {} |
164 : from_here(from_here_in), method(method_in) {} | |
165 | 165 |
166 tracked_objects::Location from_here; | 166 scoped_refptr<SingleThreadTaskRunner> task_runner; |
167 Callback<void(ObserverType*)> method; | 167 ObserverList<ObserverType> list; |
| 168 |
| 169 private: |
| 170 DISALLOW_COPY_AND_ASSIGN(ObserverListContext); |
168 }; | 171 }; |
169 | 172 |
170 ~ObserverListThreadSafe() = default; | 173 ~ObserverListThreadSafe() { |
| 174 } |
171 | 175 |
172 void NotifyWrapper(ObserverType* observer, | 176 // Wrapper which is called to fire the notifications for each thread's |
173 const NotificationData& notification) { | 177 // ObserverList. This function MUST be called on the thread which owns |
| 178 // the unsafe ObserverList. |
| 179 void NotifyWrapper(ObserverListContext* context, |
| 180 const Callback<void(ObserverType*)>& method) { |
| 181 // Check that this list still needs notifications. |
174 { | 182 { |
175 AutoLock auto_lock(lock_); | 183 AutoLock lock(list_lock_); |
| 184 auto it = observer_lists_.find(PlatformThread::CurrentId()); |
176 | 185 |
177 // Check whether the observer still needs a notification. | 186 // The ObserverList could have been removed already. In fact, it could |
178 auto it = observers_.find(observer); | 187 // have been removed and then re-added! If the master list's loop |
179 if (it == observers_.end()) | 188 // does not match this one, then we do not need to finish this |
| 189 // notification. |
| 190 if (it == observer_lists_.end() || it->second.get() != context) |
180 return; | 191 return; |
181 DCHECK(it->second->RunsTasksOnCurrentThread()); | |
182 } | 192 } |
183 | 193 |
184 // Keep track of the notification being dispatched on the current thread. | 194 for (auto& observer : context->list) { |
185 // This will be used if the callback below calls AddObserver(). | 195 method.Run(&observer); |
186 // | 196 } |
187 // Note: |tls_current_notification_| may not be nullptr if this runs in a | |
188 // nested loop started by a notification callback. In that case, it is | |
189 // important to save the previous value to restore it later. | |
190 const NotificationData* const previous_notification = | |
191 tls_current_notification_.Get(); | |
192 tls_current_notification_.Set(¬ification); | |
193 | 197 |
194 // Invoke the callback. | 198 // If there are no more observers on the list, we can now delete it. |
195 notification.method.Run(observer); | 199 if (context->list.size() == 0) { |
196 | 200 { |
197 // Reset the notification being dispatched on the current thread to its | 201 AutoLock lock(list_lock_); |
198 // previous value. | 202 // Remove |list| if it's not already removed. |
199 tls_current_notification_.Set(previous_notification); | 203 // This can happen if multiple observers got removed in a notification. |
| 204 // See http://crbug.com/55725. |
| 205 auto it = observer_lists_.find(PlatformThread::CurrentId()); |
| 206 if (it != observer_lists_.end() && it->second.get() == context) |
| 207 observer_lists_.erase(it); |
| 208 } |
| 209 } |
200 } | 210 } |
201 | 211 |
202 const NotificationType type_ = NotificationType::NOTIFY_ALL; | 212 mutable Lock list_lock_; // Protects the observer_lists_. |
203 | 213 |
204 // Synchronizes access to |observers_|. | 214 // Key by PlatformThreadId because in tests, clients can attempt to remove |
205 mutable Lock lock_; | 215 // observers without a SingleThreadTaskRunner. If this were keyed by |
| 216 // SingleThreadTaskRunner, that operation would be silently ignored, leaving |
| 217 // garbage in the ObserverList. |
| 218 std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>> |
| 219 observer_lists_; |
206 | 220 |
207 // Keys are observers. Values are the SequencedTaskRunners on which they must | 221 const NotificationType type_; |
208 // be notified. | |
209 std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>> | |
210 observers_; | |
211 | |
212 // Notification being dispatched on the current thread. | |
213 ThreadLocalPointer<const NotificationData> tls_current_notification_; | |
214 | 222 |
215 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); | 223 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); |
216 }; | 224 }; |
217 | 225 |
218 } // namespace base | 226 } // namespace base |
219 | 227 |
220 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ | 228 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ |
OLD | NEW |