Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(27)

Side by Side Diff: base/observer_list_threadsafe.h

Issue 2592143003: Allow ObserverListThreadSafe to be used from sequenced tasks. (Closed)
Patch Set: self-review Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | base/observer_list_unittest.cc » ('j') | base/observer_list_unittest.cc » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_
7 7
8 #include <algorithm> 8 #include <unordered_map>
9 #include <map>
10 #include <memory>
11 #include <tuple>
12 9
13 #include "base/bind.h" 10 #include "base/bind.h"
14 #include "base/location.h" 11 #include "base/location.h"
15 #include "base/logging.h" 12 #include "base/logging.h"
16 #include "base/macros.h" 13 #include "base/macros.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/ref_counted.h" 14 #include "base/memory/ref_counted.h"
19 #include "base/observer_list.h" 15 #include "base/observer_list.h"
16 #include "base/sequenced_task_runner.h"
17 #include "base/stl_util.h"
18 #include "base/synchronization/lock.h"
19 #include "base/threading/sequenced_task_runner_handle.h"
20 #include "base/threading/thread_local.h"
21
22 // TODO(fdoray): Removing this include causes IWYU failures in other headers,
23 // remove it in a follow- up CL.
20 #include "base/single_thread_task_runner.h" 24 #include "base/single_thread_task_runner.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/threading/thread_task_runner_handle.h"
23 25
24 /////////////////////////////////////////////////////////////////////////////// 26 ///////////////////////////////////////////////////////////////////////////////
25 // 27 //
26 // OVERVIEW: 28 // OVERVIEW:
27 // 29 //
28 // A thread-safe container for a list of observers. 30 // A thread-safe container for a list of observers. This is similar to the
29 // This is similar to the observer_list (see observer_list.h), but it 31 // observer_list (see observer_list.h), but it is more robust for multi-
30 // is more robust for multi-threaded situations. 32 // threaded situations.
31 // 33 //
32 // The following use cases are supported: 34 // The following use cases are supported:
33 // * Observers can register for notifications from any thread. 35 // * Observers can register for notifications from any sequence. They are
34 // Callbacks to the observer will occur on the same thread where 36 // always notified on the sequence from which they were registered.
35 // the observer initially called AddObserver() from. 37 // * Any sequence may trigger a notification via Notify().
36 // * Any thread may trigger a notification via Notify(). 38 // * Observers can remove themselves from the observer list inside of a
37 // * Observers can remove themselves from the observer list inside 39 // callback.
38 // of a callback. 40 // * If one sequence is notifying observers concurrently with an observer
39 // * If one thread is notifying observers concurrently with an observer 41 // removing itself from the observer list, the notifications will be
40 // removing itself from the observer list, the notifications will 42 // silently dropped.
41 // be silently dropped.
42 // 43 //
43 // The drawback of the threadsafe observer list is that notifications 44 // The drawback of the threadsafe observer list is that notifications are not
44 // are not as real-time as the non-threadsafe version of this class. 45 // as real-time as the non-threadsafe version of this class. Notifications
45 // Notifications will always be done via PostTask() to another thread, 46 // will always be done via PostTask() to another sequence, whereas with the
46 // whereas with the non-thread-safe observer_list, notifications happen 47 // non-thread-safe observer_list, notifications happen synchronously and
47 // synchronously and immediately. 48 // immediately.
gab 2017/01/12 16:24:05 - "and immediately" ? (synchronously implies immed
fdoray 2017/01/12 17:17:57 Done.
48 //
49 // IMPLEMENTATION NOTES
50 // The ObserverListThreadSafe maintains an ObserverList for each thread
51 // which uses the ThreadSafeObserver. When Notifying the observers,
52 // we simply call PostTask to each registered thread, and then each thread
53 // will notify its regular ObserverList.
54 // 49 //
55 /////////////////////////////////////////////////////////////////////////////// 50 ///////////////////////////////////////////////////////////////////////////////
56 51
57 namespace base { 52 namespace base {
58 namespace internal { 53 namespace internal {
59 54
60 template <typename ObserverType, typename Method> 55 template <typename ObserverType, typename Method>
61 struct Dispatcher; 56 struct Dispatcher;
62 57
63 template <typename ObserverType, typename ReceiverType, typename... Params> 58 template <typename ObserverType, typename ReceiverType, typename... Params>
64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { 59 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> {
65 static void Run(void(ReceiverType::* m)(Params...), 60 static void Run(void(ReceiverType::* m)(Params...),
66 Params... params, ObserverType* obj) { 61 Params... params, ObserverType* obj) {
67 (obj->*m)(std::forward<Params>(params)...); 62 (obj->*m)(std::forward<Params>(params)...);
68 } 63 }
69 }; 64 };
70 65
71 } // namespace internal 66 } // namespace internal
72 67
73 template <class ObserverType> 68 template <class ObserverType>
74 class ObserverListThreadSafe 69 class ObserverListThreadSafe
75 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { 70 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> {
76 public: 71 public:
77 using NotificationType = 72 using NotificationType =
78 typename ObserverList<ObserverType>::NotificationType; 73 typename ObserverList<ObserverType>::NotificationType;
79 74
80 ObserverListThreadSafe() 75 ObserverListThreadSafe() = default;
81 : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
82 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} 76 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
83 77
84 // Add an observer to the list. An observer should not be added to 78 // Adds |observer| to the list. |observer| must not already be in the list.
85 // the same list more than once. 79 void AddObserver(ObserverType* observer) {
86 void AddObserver(ObserverType* obs) { 80 if (!SequencedTaskRunnerHandle::IsSet())
gab 2017/01/12 16:24:06 Seems this should be a DCHECK instead of handling
fdoray 2017/01/12 17:17:57 Will do this in a separate CL since it breaks test
87 // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
88 // so do not add the observer.
89 if (!ThreadTaskRunnerHandle::IsSet())
90 return; 81 return;
91 82
92 ObserverList<ObserverType>* list = nullptr; 83 AutoLock auto_lock(lock_);
93 PlatformThreadId thread_id = PlatformThread::CurrentId(); 84
94 { 85 // Add |observer| to the list of observers.
95 AutoLock lock(list_lock_); 86 DCHECK(!ContainsKey(observers_, observer));
96 if (observer_lists_.find(thread_id) == observer_lists_.end()) { 87 const scoped_refptr<SequencedTaskRunner> task_runner =
97 observer_lists_[thread_id] = 88 SequencedTaskRunnerHandle::Get();
98 base::MakeUnique<ObserverListContext>(type_); 89 observers_[observer] = task_runner;
90
91 // If this is called while a notification is being dispatched and |type_| is
gab 2017/01/12 16:24:06 s/being dispatched/being dispatched on this thread
fdoray 2017/01/12 17:17:57 Done.
92 // NOTIFY_ALL, |observer| must be notified.
gab 2017/01/12 16:24:06 + "(if a notification is being dispatched on anoth
fdoray 2017/01/12 17:17:57 Not sure this is true. Thread / Action 1 / C
gab 2017/01/12 20:12:14 But you also have: 1 / Calls Notify() 2
fdoray 2017/01/12 20:57:44 Done.
93 if (type_ == NotificationType::NOTIFY_ALL) {
94 const NotificationData* current_notification =
95 tls_current_notification_.Get();
96 if (current_notification) {
97 task_runner->PostTask(
98 current_notification->from_here,
99 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
100 observer, *current_notification));
99 } 101 }
100 list = &(observer_lists_[thread_id]->list);
101 } 102 }
102 list->AddObserver(obs);
103 } 103 }
104 104
105 // Remove an observer from the list if it is in the list. 105 // Remove an observer from the list if it is in the list.
106 // If there are pending notifications in-transit to the observer, they will 106 // If there are pending notifications in-transit to the observer, they will
107 // be aborted. 107 // be aborted.
108 // If the observer to be removed is in the list, RemoveObserver MUST 108 // If the observer to be removed is in the list, RemoveObserver MUST
109 // be called from the same thread which called AddObserver. 109 // be called from the same sequence which called AddObserver.
110 void RemoveObserver(ObserverType* obs) { 110 void RemoveObserver(ObserverType* observer) {
111 PlatformThreadId thread_id = PlatformThread::CurrentId(); 111 AutoLock auto_lock(lock_);
112 { 112 auto it = observers_.find(observer);
113 AutoLock lock(list_lock_); 113 if (it == observers_.end())
114 auto it = observer_lists_.find(thread_id); 114 return;
115 if (it == observer_lists_.end()) { 115 DCHECK(it->second->RunsTasksOnCurrentThread());
116 // This will happen if we try to remove an observer on a thread 116 observers_.erase(it);
117 // we never added an observer for.
118 return;
119 }
120 ObserverList<ObserverType>& list = it->second->list;
121
122 list.RemoveObserver(obs);
123
124 // If that was the last observer in the list, remove the ObserverList
125 // entirely.
126 if (list.size() == 0)
127 observer_lists_.erase(it);
128 }
129 } 117 }
130 118
131 // Verifies that the list is currently empty (i.e. there are no observers). 119 // Verifies that the list is currently empty (i.e. there are no observers).
132 void AssertEmpty() const { 120 void AssertEmpty() const {
133 AutoLock lock(list_lock_); 121 #if DCHECK_IS_ON()
134 DCHECK(observer_lists_.empty()); 122 AutoLock auto_lock(lock_);
123 DCHECK(observers_.empty());
124 #endif
135 } 125 }
136 126
137 // Notify methods. 127 // Asynchronously invokes a callback on all observers, on their registration
138 // Make a thread-safe callback to each Observer in the list. 128 // sequence. You cannot assume that at the completion of the Notify call that
139 // Note, these calls are effectively asynchronous. You cannot assume 129 // all Observers have been Notified. The notification may still be pending
140 // that at the completion of the Notify call that all Observers have 130 // delivery.
141 // been Notified. The notification may still be pending delivery.
142 template <typename Method, typename... Params> 131 template <typename Method, typename... Params>
143 void Notify(const tracked_objects::Location& from_here, 132 void Notify(const tracked_objects::Location& from_here,
144 Method m, Params&&... params) { 133 Method m, Params&&... params) {
145 Callback<void(ObserverType*)> method = 134 Callback<void(ObserverType*)> method =
146 Bind(&internal::Dispatcher<ObserverType, Method>::Run, 135 Bind(&internal::Dispatcher<ObserverType, Method>::Run,
147 m, std::forward<Params>(params)...); 136 m, std::forward<Params>(params)...);
148 137
149 AutoLock lock(list_lock_); 138 AutoLock lock(lock_);
150 for (const auto& entry : observer_lists_) { 139 for (const auto& observer : observers_) {
151 ObserverListContext* context = entry.second.get(); 140 observer.second->PostTask(
152 context->task_runner->PostTask(
153 from_here, 141 from_here,
154 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, 142 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
155 this, context, method)); 143 observer.first, NotificationData(from_here, method)));
156 } 144 }
157 } 145 }
158 146
159 private: 147 private:
160 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; 148 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
161 149
162 struct ObserverListContext { 150 struct NotificationData {
163 explicit ObserverListContext(NotificationType type) 151 NotificationData(const tracked_objects::Location& from_here_in,
164 : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {} 152 const Callback<void(ObserverType*)>& method_in)
153 : from_here(from_here_in), method(method_in) {}
165 154
166 scoped_refptr<SingleThreadTaskRunner> task_runner; 155 tracked_objects::Location from_here;
167 ObserverList<ObserverType> list; 156 Callback<void(ObserverType*)> method;
168
169 private:
170 DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
171 }; 157 };
172 158
173 ~ObserverListThreadSafe() { 159 ~ObserverListThreadSafe() = default;
160
161 void NotifyWrapper(ObserverType* observer,
162 const NotificationData& notification) {
163 {
164 AutoLock auto_lock(lock_);
165
166 // Check whether the observer still needs a notification.
167 auto it = observers_.find(observer);
168 if (it == observers_.end())
169 return;
170 DCHECK(it->second->RunsTasksOnCurrentThread());
171 }
172
173 // Keep track of the notification being dispatched on the current thread.
174 // This will be used if the callback below calls AddObserver().
175 const NotificationData* const previous_notification =
176 tls_current_notification_.Get();
gab 2017/01/12 16:24:06 As mentioned in another comment, I don't think nes
fdoray 2017/01/12 17:17:57 See previous comment in this file (a notification
177 tls_current_notification_.Set(&notification);
178
179 // Invoke the callback.
180 notification.method.Run(observer);
181
182 // Reset the notification being dispatched on the current thread to its
183 // previous value.
184 tls_current_notification_.Set(previous_notification);
174 } 185 }
175 186
176 // Wrapper which is called to fire the notifications for each thread's 187 const NotificationType type_ = NotificationType::NOTIFY_ALL;
177 // ObserverList. This function MUST be called on the thread which owns
178 // the unsafe ObserverList.
179 void NotifyWrapper(ObserverListContext* context,
180 const Callback<void(ObserverType*)>& method) {
181 // Check that this list still needs notifications.
182 {
183 AutoLock lock(list_lock_);
184 auto it = observer_lists_.find(PlatformThread::CurrentId());
185 188
186 // The ObserverList could have been removed already. In fact, it could 189 // Synchronizes access to |observers_|.
187 // have been removed and then re-added! If the master list's loop 190 mutable Lock lock_;
188 // does not match this one, then we do not need to finish this
189 // notification.
190 if (it == observer_lists_.end() || it->second.get() != context)
191 return;
192 }
193 191
194 for (auto& observer : context->list) { 192 // Keys are observers. Values are the SequencedTaskRunners on which they must
195 method.Run(&observer); 193 // be notified.
196 } 194 std::unordered_map<ObserverType*, scoped_refptr<SequencedTaskRunner>>
195 observers_;
197 196
198 // If there are no more observers on the list, we can now delete it. 197 // Notification being dispatched on the current thread.
199 if (context->list.size() == 0) { 198 ThreadLocalPointer<const NotificationData> tls_current_notification_;
200 {
201 AutoLock lock(list_lock_);
202 // Remove |list| if it's not already removed.
203 // This can happen if multiple observers got removed in a notification.
204 // See http://crbug.com/55725.
205 auto it = observer_lists_.find(PlatformThread::CurrentId());
206 if (it != observer_lists_.end() && it->second.get() == context)
207 observer_lists_.erase(it);
208 }
209 }
210 }
211
212 mutable Lock list_lock_; // Protects the observer_lists_.
213
214 // Key by PlatformThreadId because in tests, clients can attempt to remove
215 // observers without a SingleThreadTaskRunner. If this were keyed by
216 // SingleThreadTaskRunner, that operation would be silently ignored, leaving
217 // garbage in the ObserverList.
218 std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
219 observer_lists_;
220
221 const NotificationType type_;
222 199
223 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); 200 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
224 }; 201 };
225 202
226 } // namespace base 203 } // namespace base
227 204
228 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ 205 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_
OLDNEW
« no previous file with comments | « no previous file | base/observer_list_unittest.cc » ('j') | base/observer_list_unittest.cc » ('J')

Powered by Google App Engine
This is Rietveld 408576698