Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(410)

Side by Side Diff: base/observer_list_threadsafe.h

Issue 2669673002: Revert of Allow ObserverListThreadSafe to be used from sequenced tasks. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/android/application_status_listener_unittest.cc ('k') | base/observer_list_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_
7 7
8 #include <unordered_map> 8 #include <algorithm>
9 #include <map>
10 #include <memory>
11 #include <tuple>
9 12
10 #include "base/bind.h" 13 #include "base/bind.h"
11 #include "base/location.h" 14 #include "base/location.h"
12 #include "base/logging.h" 15 #include "base/logging.h"
13 #include "base/macros.h" 16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h"
14 #include "base/memory/ref_counted.h" 18 #include "base/memory/ref_counted.h"
15 #include "base/observer_list.h" 19 #include "base/observer_list.h"
16 #include "base/sequenced_task_runner.h"
17 #include "base/stl_util.h"
18 #include "base/synchronization/lock.h"
19 #include "base/threading/sequenced_task_runner_handle.h"
20 #include "base/threading/thread_local.h"
21 #include "build/build_config.h"
22
23 // TODO(fdoray): Removing these includes causes IWYU failures in other headers,
24 // remove them in a follow- up CL.
25 #include "base/memory/ptr_util.h"
26 #include "base/single_thread_task_runner.h" 20 #include "base/single_thread_task_runner.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/threading/thread_task_runner_handle.h"
27 23
28 /////////////////////////////////////////////////////////////////////////////// 24 ///////////////////////////////////////////////////////////////////////////////
29 // 25 //
30 // OVERVIEW: 26 // OVERVIEW:
31 // 27 //
32 // A thread-safe container for a list of observers. This is similar to the 28 // A thread-safe container for a list of observers.
33 // observer_list (see observer_list.h), but it is more robust for multi- 29 // This is similar to the observer_list (see observer_list.h), but it
34 // threaded situations. 30 // is more robust for multi-threaded situations.
35 // 31 //
36 // The following use cases are supported: 32 // The following use cases are supported:
37 // * Observers can register for notifications from any sequence. They are 33 // * Observers can register for notifications from any thread.
38 // always notified on the sequence from which they were registered. 34 // Callbacks to the observer will occur on the same thread where
39 // * Any sequence may trigger a notification via Notify(). 35 // the observer initially called AddObserver() from.
40 // * Observers can remove themselves from the observer list inside of a 36 // * Any thread may trigger a notification via Notify().
41 // callback. 37 // * Observers can remove themselves from the observer list inside
42 // * If one sequence is notifying observers concurrently with an observer 38 // of a callback.
43 // removing itself from the observer list, the notifications will be 39 // * If one thread is notifying observers concurrently with an observer
44 // silently dropped. 40 // removing itself from the observer list, the notifications will
41 // be silently dropped.
45 // 42 //
46 // The drawback of the threadsafe observer list is that notifications are not 43 // The drawback of the threadsafe observer list is that notifications
47 // as real-time as the non-threadsafe version of this class. Notifications 44 // are not as real-time as the non-threadsafe version of this class.
48 // will always be done via PostTask() to another sequence, whereas with the 45 // Notifications will always be done via PostTask() to another thread,
49 // non-thread-safe observer_list, notifications happen synchronously. 46 // whereas with the non-thread-safe observer_list, notifications happen
47 // synchronously and immediately.
48 //
49 // IMPLEMENTATION NOTES
50 // The ObserverListThreadSafe maintains an ObserverList for each thread
51 // which uses the ThreadSafeObserver. When Notifying the observers,
52 // we simply call PostTask to each registered thread, and then each thread
53 // will notify its regular ObserverList.
50 // 54 //
51 /////////////////////////////////////////////////////////////////////////////// 55 ///////////////////////////////////////////////////////////////////////////////
52 56
53 namespace base { 57 namespace base {
54 namespace internal { 58 namespace internal {
55 59
56 template <typename ObserverType, typename Method> 60 template <typename ObserverType, typename Method>
57 struct Dispatcher; 61 struct Dispatcher;
58 62
59 template <typename ObserverType, typename ReceiverType, typename... Params> 63 template <typename ObserverType, typename ReceiverType, typename... Params>
60 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { 64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> {
61 static void Run(void(ReceiverType::* m)(Params...), 65 static void Run(void(ReceiverType::* m)(Params...),
62 Params... params, ObserverType* obj) { 66 Params... params, ObserverType* obj) {
63 (obj->*m)(std::forward<Params>(params)...); 67 (obj->*m)(std::forward<Params>(params)...);
64 } 68 }
65 }; 69 };
66 70
67 } // namespace internal 71 } // namespace internal
68 72
69 template <class ObserverType> 73 template <class ObserverType>
70 class ObserverListThreadSafe 74 class ObserverListThreadSafe
71 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { 75 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> {
72 public: 76 public:
73 using NotificationType = 77 using NotificationType =
74 typename ObserverList<ObserverType>::NotificationType; 78 typename ObserverList<ObserverType>::NotificationType;
75 79
76 ObserverListThreadSafe() = default; 80 ObserverListThreadSafe()
81 : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
77 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} 82 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
78 83
79 // Adds |observer| to the list. |observer| must not already be in the list. 84 // Add an observer to the list. An observer should not be added to
80 void AddObserver(ObserverType* observer) { 85 // the same list more than once.
81 // TODO(fdoray): Change this to a DCHECK once all call sites have a 86 void AddObserver(ObserverType* obs) {
82 // SequencedTaskRunnerHandle. 87 // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
83 if (!SequencedTaskRunnerHandle::IsSet()) 88 // so do not add the observer.
89 if (!ThreadTaskRunnerHandle::IsSet())
84 return; 90 return;
85 91
86 AutoLock auto_lock(lock_); 92 ObserverList<ObserverType>* list = nullptr;
87 93 PlatformThreadId thread_id = PlatformThread::CurrentId();
88 // Add |observer| to the list of observers. 94 {
89 DCHECK(!ContainsKey(observers_, observer)); 95 AutoLock lock(list_lock_);
90 const scoped_refptr<SequencedTaskRunner> task_runner = 96 if (observer_lists_.find(thread_id) == observer_lists_.end()) {
91 SequencedTaskRunnerHandle::Get(); 97 observer_lists_[thread_id] =
92 observers_[observer] = task_runner; 98 base::MakeUnique<ObserverListContext>(type_);
93
94 // If this is called while a notification is being dispatched on this thread
95 // and |type_| is NOTIFY_ALL, |observer| must be notified (if a notification
96 // is being dispatched on another thread in parallel, the notification may
97 // or may not make it to |observer| depending on the outcome of the race to
98 // |lock_|).
99 if (type_ == NotificationType::NOTIFY_ALL) {
100 const NotificationData* current_notification =
101 tls_current_notification_.Get();
102 if (current_notification) {
103 task_runner->PostTask(
104 current_notification->from_here,
105 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
106 observer, *current_notification));
107 } 99 }
100 list = &(observer_lists_[thread_id]->list);
108 } 101 }
102 list->AddObserver(obs);
109 } 103 }
110 104
111 // Remove an observer from the list if it is in the list. 105 // Remove an observer from the list if it is in the list.
112 // If there are pending notifications in-transit to the observer, they will 106 // If there are pending notifications in-transit to the observer, they will
113 // be aborted. 107 // be aborted.
114 // If the observer to be removed is in the list, RemoveObserver MUST 108 // If the observer to be removed is in the list, RemoveObserver MUST
115 // be called from the same sequence which called AddObserver. 109 // be called from the same thread which called AddObserver.
116 void RemoveObserver(ObserverType* observer) { 110 void RemoveObserver(ObserverType* obs) {
117 AutoLock auto_lock(lock_); 111 PlatformThreadId thread_id = PlatformThread::CurrentId();
118 auto it = observers_.find(observer); 112 {
119 if (it == observers_.end()) 113 AutoLock lock(list_lock_);
120 return; 114 auto it = observer_lists_.find(thread_id);
115 if (it == observer_lists_.end()) {
116 // This will happen if we try to remove an observer on a thread
117 // we never added an observer for.
118 return;
119 }
120 ObserverList<ObserverType>& list = it->second->list;
121 121
122 // TODO(fdoray): Enable this on Android once all tests pass. 122 list.RemoveObserver(obs);
123 #if !defined(OS_ANDROID)
124 DCHECK(it->second->RunsTasksOnCurrentThread());
125 #endif
126 123
127 observers_.erase(it); 124 // If that was the last observer in the list, remove the ObserverList
125 // entirely.
126 if (list.size() == 0)
127 observer_lists_.erase(it);
128 }
128 } 129 }
129 130
130 // Verifies that the list is currently empty (i.e. there are no observers). 131 // Verifies that the list is currently empty (i.e. there are no observers).
131 void AssertEmpty() const { 132 void AssertEmpty() const {
132 #if DCHECK_IS_ON() 133 AutoLock lock(list_lock_);
133 AutoLock auto_lock(lock_); 134 DCHECK(observer_lists_.empty());
134 DCHECK(observers_.empty());
135 #endif
136 } 135 }
137 136
138 // Asynchronously invokes a callback on all observers, on their registration 137 // Notify methods.
139 // sequence. You cannot assume that at the completion of the Notify call that 138 // Make a thread-safe callback to each Observer in the list.
140 // all Observers have been Notified. The notification may still be pending 139 // Note, these calls are effectively asynchronous. You cannot assume
141 // delivery. 140 // that at the completion of the Notify call that all Observers have
141 // been Notified. The notification may still be pending delivery.
142 template <typename Method, typename... Params> 142 template <typename Method, typename... Params>
143 void Notify(const tracked_objects::Location& from_here, 143 void Notify(const tracked_objects::Location& from_here,
144 Method m, Params&&... params) { 144 Method m, Params&&... params) {
145 Callback<void(ObserverType*)> method = 145 Callback<void(ObserverType*)> method =
146 Bind(&internal::Dispatcher<ObserverType, Method>::Run, 146 Bind(&internal::Dispatcher<ObserverType, Method>::Run,
147 m, std::forward<Params>(params)...); 147 m, std::forward<Params>(params)...);
148 148
149 AutoLock lock(lock_); 149 AutoLock lock(list_lock_);
150 for (const auto& observer : observers_) { 150 for (const auto& entry : observer_lists_) {
151 observer.second->PostTask( 151 ObserverListContext* context = entry.second.get();
152 context->task_runner->PostTask(
152 from_here, 153 from_here,
153 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this, 154 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
154 observer.first, NotificationData(from_here, method))); 155 this, context, method));
155 } 156 }
156 } 157 }
157 158
158 private: 159 private:
159 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; 160 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
160 161
161 struct NotificationData { 162 struct ObserverListContext {
162 NotificationData(const tracked_objects::Location& from_here_in, 163 explicit ObserverListContext(NotificationType type)
163 const Callback<void(ObserverType*)>& method_in) 164 : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
164 : from_here(from_here_in), method(method_in) {}
165 165
166 tracked_objects::Location from_here; 166 scoped_refptr<SingleThreadTaskRunner> task_runner;
167 Callback<void(ObserverType*)> method; 167 ObserverList<ObserverType> list;
168
169 private:
170 DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
168 }; 171 };
169 172
170 ~ObserverListThreadSafe() = default; 173 ~ObserverListThreadSafe() {
174 }
171 175
172 void NotifyWrapper(ObserverType* observer, 176 // Wrapper which is called to fire the notifications for each thread's
173 const NotificationData& notification) { 177 // ObserverList. This function MUST be called on the thread which owns
178 // the unsafe ObserverList.
179 void NotifyWrapper(ObserverListContext* context,
180 const Callback<void(ObserverType*)>& method) {
181 // Check that this list still needs notifications.
174 { 182 {
175 AutoLock auto_lock(lock_); 183 AutoLock lock(list_lock_);
184 auto it = observer_lists_.find(PlatformThread::CurrentId());
176 185
177 // Check whether the observer still needs a notification. 186 // The ObserverList could have been removed already. In fact, it could
178 auto it = observers_.find(observer); 187 // have been removed and then re-added! If the master list's loop
179 if (it == observers_.end()) 188 // does not match this one, then we do not need to finish this
189 // notification.
190 if (it == observer_lists_.end() || it->second.get() != context)
180 return; 191 return;
181 DCHECK(it->second->RunsTasksOnCurrentThread());
182 } 192 }
183 193
184 // Keep track of the notification being dispatched on the current thread. 194 for (auto& observer : context->list) {
185 // This will be used if the callback below calls AddObserver(). 195 method.Run(&observer);
186 // 196 }
187 // Note: |tls_current_notification_| may not be nullptr if this runs in a
188 // nested loop started by a notification callback. In that case, it is
189 // important to save the previous value to restore it later.
190 const NotificationData* const previous_notification =
191 tls_current_notification_.Get();
192 tls_current_notification_.Set(&notification);
193 197
194 // Invoke the callback. 198 // If there are no more observers on the list, we can now delete it.
195 notification.method.Run(observer); 199 if (context->list.size() == 0) {
196 200 {
197 // Reset the notification being dispatched on the current thread to its 201 AutoLock lock(list_lock_);
198 // previous value. 202 // Remove |list| if it's not already removed.
199 tls_current_notification_.Set(previous_notification); 203 // This can happen if multiple observers got removed in a notification.
204 // See http://crbug.com/55725.
205 auto it = observer_lists_.find(PlatformThread::CurrentId());
206 if (it != observer_lists_.end() && it->second.get() == context)
207 observer_lists_.erase(it);
208 }
209 }
200 } 210 }
201 211
202 const NotificationType type_ = NotificationType::NOTIFY_ALL; 212 mutable Lock list_lock_; // Protects the observer_lists_.
203 213
204 // Synchronizes access to |observers_|. 214 // Key by PlatformThreadId because in tests, clients can attempt to remove
205 mutable Lock lock_; 215 // observers without a SingleThreadTaskRunner. If this were keyed by
216 // SingleThreadTaskRunner, that operation would be silently ignored, leaving
217 // garbage in the ObserverList.
218 std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
219 observer_lists_;
206 220
207 // Keys are observers. Values are the SequencedTaskRunners on which they must 221 const NotificationType type_;
208 // be notified.
209 std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
210 observers_;
211
212 // Notification being dispatched on the current thread.
213 ThreadLocalPointer<const NotificationData> tls_current_notification_;
214 222
215 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); 223 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
216 }; 224 };
217 225
218 } // namespace base 226 } // namespace base
219 227
220 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ 228 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_
OLDNEW
« no previous file with comments | « base/android/application_status_listener_unittest.cc ('k') | base/observer_list_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698