OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ | 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ |
6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ | 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ |
7 | 7 |
8 #include <algorithm> | 8 #include <unordered_map> |
9 #include <map> | |
10 #include <memory> | |
11 #include <tuple> | |
12 | 9 |
13 #include "base/bind.h" | 10 #include "base/bind.h" |
14 #include "base/location.h" | 11 #include "base/location.h" |
15 #include "base/logging.h" | 12 #include "base/logging.h" |
16 #include "base/macros.h" | 13 #include "base/macros.h" |
17 #include "base/memory/ptr_util.h" | |
18 #include "base/memory/ref_counted.h" | 14 #include "base/memory/ref_counted.h" |
19 #include "base/observer_list.h" | 15 #include "base/observer_list.h" |
16 #include "base/sequenced_task_runner.h" | |
17 #include "base/stl_util.h" | |
18 #include "base/synchronization/lock.h" | |
19 #include "base/threading/sequenced_task_runner_handle.h" | |
20 #include "base/threading/thread_local.h" | |
21 | |
22 // TODO(fdoray): Removing this include causes IWYU failures in other headers, | |
23 // remove it in a follow- up CL. | |
20 #include "base/single_thread_task_runner.h" | 24 #include "base/single_thread_task_runner.h" |
21 #include "base/threading/platform_thread.h" | |
22 #include "base/threading/thread_task_runner_handle.h" | |
23 | 25 |
24 /////////////////////////////////////////////////////////////////////////////// | 26 /////////////////////////////////////////////////////////////////////////////// |
25 // | 27 // |
26 // OVERVIEW: | 28 // OVERVIEW: |
27 // | 29 // |
28 // A thread-safe container for a list of observers. | 30 // A thread-safe container for a list of observers. This is similar to the |
29 // This is similar to the observer_list (see observer_list.h), but it | 31 // observer_list (see observer_list.h), but it is more robust for multi- |
30 // is more robust for multi-threaded situations. | 32 // threaded situations. |
31 // | 33 // |
32 // The following use cases are supported: | 34 // The following use cases are supported: |
33 // * Observers can register for notifications from any thread. | 35 // * Observers can register for notifications from any sequence. They are |
34 // Callbacks to the observer will occur on the same thread where | 36 // always notified on the sequence from which they were registered. |
35 // the observer initially called AddObserver() from. | 37 // * Any sequence may trigger a notification via Notify(). |
36 // * Any thread may trigger a notification via Notify(). | 38 // * Observers can remove themselves from the observer list inside of a |
37 // * Observers can remove themselves from the observer list inside | 39 // callback. |
38 // of a callback. | 40 // * If one sequence is notifying observers concurrently with an observer |
39 // * If one thread is notifying observers concurrently with an observer | 41 // removing itself from the observer list, the notifications will be |
40 // removing itself from the observer list, the notifications will | 42 // silently dropped. |
41 // be silently dropped. | |
42 // | 43 // |
43 // The drawback of the threadsafe observer list is that notifications | 44 // The drawback of the threadsafe observer list is that notifications are not |
44 // are not as real-time as the non-threadsafe version of this class. | 45 // as real-time as the non-threadsafe version of this class. Notifications |
45 // Notifications will always be done via PostTask() to another thread, | 46 // will always be done via PostTask() to another sequence, whereas with the |
46 // whereas with the non-thread-safe observer_list, notifications happen | 47 // non-thread-safe observer_list, notifications happen synchronously. |
47 // synchronously and immediately. | |
48 // | |
49 // IMPLEMENTATION NOTES | |
50 // The ObserverListThreadSafe maintains an ObserverList for each thread | |
51 // which uses the ThreadSafeObserver. When Notifying the observers, | |
52 // we simply call PostTask to each registered thread, and then each thread | |
53 // will notify its regular ObserverList. | |
54 // | 48 // |
55 /////////////////////////////////////////////////////////////////////////////// | 49 /////////////////////////////////////////////////////////////////////////////// |
56 | 50 |
57 namespace base { | 51 namespace base { |
58 namespace internal { | 52 namespace internal { |
59 | 53 |
60 template <typename ObserverType, typename Method> | 54 template <typename ObserverType, typename Method> |
61 struct Dispatcher; | 55 struct Dispatcher; |
62 | 56 |
63 template <typename ObserverType, typename ReceiverType, typename... Params> | 57 template <typename ObserverType, typename ReceiverType, typename... Params> |
64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { | 58 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { |
65 static void Run(void(ReceiverType::* m)(Params...), | 59 static void Run(void(ReceiverType::* m)(Params...), |
66 Params... params, ObserverType* obj) { | 60 Params... params, ObserverType* obj) { |
67 (obj->*m)(std::forward<Params>(params)...); | 61 (obj->*m)(std::forward<Params>(params)...); |
68 } | 62 } |
69 }; | 63 }; |
70 | 64 |
71 } // namespace internal | 65 } // namespace internal |
72 | 66 |
73 template <class ObserverType> | 67 template <class ObserverType> |
74 class ObserverListThreadSafe | 68 class ObserverListThreadSafe |
75 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { | 69 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { |
76 public: | 70 public: |
77 using NotificationType = | 71 using NotificationType = |
78 typename ObserverList<ObserverType>::NotificationType; | 72 typename ObserverList<ObserverType>::NotificationType; |
79 | 73 |
80 ObserverListThreadSafe() | 74 ObserverListThreadSafe() = default; |
81 : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {} | |
82 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} | 75 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} |
83 | 76 |
84 // Add an observer to the list. An observer should not be added to | 77 // Adds |observer| to the list. |observer| must not already be in the list. |
85 // the same list more than once. | 78 void AddObserver(ObserverType* observer) { |
86 void AddObserver(ObserverType* obs) { | 79 // TODO(fdoray): Change this to a DCHECK once all call sites have a |
87 // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it, | 80 // SequencedTaskRunnerHandle. |
88 // so do not add the observer. | 81 if (!SequencedTaskRunnerHandle::IsSet()) |
89 if (!ThreadTaskRunnerHandle::IsSet()) | |
90 return; | 82 return; |
91 | 83 |
92 ObserverList<ObserverType>* list = nullptr; | 84 AutoLock auto_lock(lock_); |
93 PlatformThreadId thread_id = PlatformThread::CurrentId(); | 85 |
94 { | 86 // Add |observer| to the list of observers. |
95 AutoLock lock(list_lock_); | 87 DCHECK(!ContainsKey(observers_, observer)); |
96 if (observer_lists_.find(thread_id) == observer_lists_.end()) { | 88 const scoped_refptr<SequencedTaskRunner> task_runner = |
97 observer_lists_[thread_id] = | 89 SequencedTaskRunnerHandle::Get(); |
98 base::MakeUnique<ObserverListContext>(type_); | 90 observers_[observer] = task_runner; |
91 | |
92 // If this is called while a notification is being dispatched on this thread | |
93 // and |type_| is NOTIFY_ALL, |observer| must be notified. | |
94 if (type_ == NotificationType::NOTIFY_ALL) { | |
95 const NotificationData* current_notification = | |
96 tls_current_notification_.Get(); | |
97 if (current_notification) { | |
98 task_runner->PostTask( | |
99 current_notification->from_here, | |
100 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, | |
101 observer, *current_notification)); | |
99 } | 102 } |
100 list = &(observer_lists_[thread_id]->list); | |
101 } | 103 } |
102 list->AddObserver(obs); | |
103 } | 104 } |
104 | 105 |
105 // Remove an observer from the list if it is in the list. | 106 // Remove an observer from the list if it is in the list. |
106 // If there are pending notifications in-transit to the observer, they will | 107 // If there are pending notifications in-transit to the observer, they will |
107 // be aborted. | 108 // be aborted. |
108 // If the observer to be removed is in the list, RemoveObserver MUST | 109 // If the observer to be removed is in the list, RemoveObserver MUST |
109 // be called from the same thread which called AddObserver. | 110 // be called from the same sequence which called AddObserver. |
110 void RemoveObserver(ObserverType* obs) { | 111 void RemoveObserver(ObserverType* observer) { |
111 PlatformThreadId thread_id = PlatformThread::CurrentId(); | 112 AutoLock auto_lock(lock_); |
112 { | 113 auto it = observers_.find(observer); |
113 AutoLock lock(list_lock_); | 114 if (it == observers_.end()) |
114 auto it = observer_lists_.find(thread_id); | 115 return; |
115 if (it == observer_lists_.end()) { | 116 DCHECK(it->second->RunsTasksOnCurrentThread()); |
116 // This will happen if we try to remove an observer on a thread | 117 observers_.erase(it); |
117 // we never added an observer for. | |
118 return; | |
119 } | |
120 ObserverList<ObserverType>& list = it->second->list; | |
121 | |
122 list.RemoveObserver(obs); | |
123 | |
124 // If that was the last observer in the list, remove the ObserverList | |
125 // entirely. | |
126 if (list.size() == 0) | |
127 observer_lists_.erase(it); | |
128 } | |
129 } | 118 } |
130 | 119 |
131 // Verifies that the list is currently empty (i.e. there are no observers). | 120 // Verifies that the list is currently empty (i.e. there are no observers). |
132 void AssertEmpty() const { | 121 void AssertEmpty() const { |
133 AutoLock lock(list_lock_); | 122 #if DCHECK_IS_ON() |
134 DCHECK(observer_lists_.empty()); | 123 AutoLock auto_lock(lock_); |
124 DCHECK(observers_.empty()); | |
125 #endif | |
135 } | 126 } |
136 | 127 |
137 // Notify methods. | 128 // Asynchronously invokes a callback on all observers, on their registration |
138 // Make a thread-safe callback to each Observer in the list. | 129 // sequence. You cannot assume that at the completion of the Notify call that |
139 // Note, these calls are effectively asynchronous. You cannot assume | 130 // all Observers have been Notified. The notification may still be pending |
140 // that at the completion of the Notify call that all Observers have | 131 // delivery. |
141 // been Notified. The notification may still be pending delivery. | |
142 template <typename Method, typename... Params> | 132 template <typename Method, typename... Params> |
143 void Notify(const tracked_objects::Location& from_here, | 133 void Notify(const tracked_objects::Location& from_here, |
144 Method m, Params&&... params) { | 134 Method m, Params&&... params) { |
145 Callback<void(ObserverType*)> method = | 135 Callback<void(ObserverType*)> method = |
146 Bind(&internal::Dispatcher<ObserverType, Method>::Run, | 136 Bind(&internal::Dispatcher<ObserverType, Method>::Run, |
147 m, std::forward<Params>(params)...); | 137 m, std::forward<Params>(params)...); |
148 | 138 |
149 AutoLock lock(list_lock_); | 139 AutoLock lock(lock_); |
150 for (const auto& entry : observer_lists_) { | 140 for (const auto& observer : observers_) { |
151 ObserverListContext* context = entry.second.get(); | 141 observer.second->PostTask( |
152 context->task_runner->PostTask( | |
153 from_here, | 142 from_here, |
154 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, | 143 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, |
155 this, context, method)); | 144 observer.first, NotificationData(from_here, method))); |
156 } | 145 } |
157 } | 146 } |
158 | 147 |
159 private: | 148 private: |
160 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; | 149 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; |
161 | 150 |
162 struct ObserverListContext { | 151 struct NotificationData { |
163 explicit ObserverListContext(NotificationType type) | 152 NotificationData(const tracked_objects::Location& from_here_in, |
164 : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {} | 153 const Callback<void(ObserverType*)>& method_in) |
154 : from_here(from_here_in), method(method_in) {} | |
165 | 155 |
166 scoped_refptr<SingleThreadTaskRunner> task_runner; | 156 tracked_objects::Location from_here; |
167 ObserverList<ObserverType> list; | 157 Callback<void(ObserverType*)> method; |
168 | |
169 private: | |
170 DISALLOW_COPY_AND_ASSIGN(ObserverListContext); | |
171 }; | 158 }; |
172 | 159 |
173 ~ObserverListThreadSafe() { | 160 ~ObserverListThreadSafe() = default; |
161 | |
162 void NotifyWrapper(ObserverType* observer, | |
163 const NotificationData& notification) { | |
164 { | |
165 AutoLock auto_lock(lock_); | |
166 | |
167 // Check whether the observer still needs a notification. | |
168 auto it = observers_.find(observer); | |
169 if (it == observers_.end()) | |
170 return; | |
171 DCHECK(it->second->RunsTasksOnCurrentThread()); | |
172 } | |
173 | |
174 // Keep track of the notification being dispatched on the current thread. | |
175 // This will be used if the callback below calls AddObserver(). | |
gab
2017/01/12 20:12:14
Expand comment to explain the specific use case (n
fdoray
2017/01/12 20:57:44
Done.
| |
176 const NotificationData* const previous_notification = | |
177 tls_current_notification_.Get(); | |
178 tls_current_notification_.Set(¬ification); | |
179 | |
180 // Invoke the callback. | |
181 notification.method.Run(observer); | |
182 | |
183 // Reset the notification being dispatched on the current thread to its | |
184 // previous value. | |
185 tls_current_notification_.Set(previous_notification); | |
174 } | 186 } |
175 | 187 |
176 // Wrapper which is called to fire the notifications for each thread's | 188 const NotificationType type_ = NotificationType::NOTIFY_ALL; |
177 // ObserverList. This function MUST be called on the thread which owns | |
178 // the unsafe ObserverList. | |
179 void NotifyWrapper(ObserverListContext* context, | |
180 const Callback<void(ObserverType*)>& method) { | |
181 // Check that this list still needs notifications. | |
182 { | |
183 AutoLock lock(list_lock_); | |
184 auto it = observer_lists_.find(PlatformThread::CurrentId()); | |
185 | 189 |
186 // The ObserverList could have been removed already. In fact, it could | 190 // Synchronizes access to |observers_|. |
187 // have been removed and then re-added! If the master list's loop | 191 mutable Lock lock_; |
188 // does not match this one, then we do not need to finish this | |
189 // notification. | |
190 if (it == observer_lists_.end() || it->second.get() != context) | |
191 return; | |
192 } | |
193 | 192 |
194 for (auto& observer : context->list) { | 193 // Keys are observers. Values are the SequencedTaskRunners on which they must |
195 method.Run(&observer); | 194 // be notified. |
196 } | 195 std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>> |
196 observers_; | |
197 | 197 |
198 // If there are no more observers on the list, we can now delete it. | 198 // Notification being dispatched on the current thread. |
199 if (context->list.size() == 0) { | 199 ThreadLocalPointer<const NotificationData> tls_current_notification_; |
200 { | |
201 AutoLock lock(list_lock_); | |
202 // Remove |list| if it's not already removed. | |
203 // This can happen if multiple observers got removed in a notification. | |
204 // See http://crbug.com/55725. | |
205 auto it = observer_lists_.find(PlatformThread::CurrentId()); | |
206 if (it != observer_lists_.end() && it->second.get() == context) | |
207 observer_lists_.erase(it); | |
208 } | |
209 } | |
210 } | |
211 | |
212 mutable Lock list_lock_; // Protects the observer_lists_. | |
213 | |
214 // Key by PlatformThreadId because in tests, clients can attempt to remove | |
215 // observers without a SingleThreadTaskRunner. If this were keyed by | |
216 // SingleThreadTaskRunner, that operation would be silently ignored, leaving | |
217 // garbage in the ObserverList. | |
218 std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>> | |
219 observer_lists_; | |
220 | |
221 const NotificationType type_; | |
222 | 200 |
223 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); | 201 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); |
224 }; | 202 }; |
225 | 203 |
226 } // namespace base | 204 } // namespace base |
227 | 205 |
228 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ | 206 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ |
OLD | NEW |