Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1129)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1234213002: Fix data races on |SharedMemoryDataConsumerHandle::Context::notification_task_runner_| (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Add AssertAquired(). Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/shared_memory_data_consumer_handle.h" 5 #include "content/child/shared_memory_data_consumer_handle.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <deque> 8 #include <deque>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 28 matching lines...) Expand all
39 scoped_ptr<RequestPeer::ReceivedData> data_; 39 scoped_ptr<RequestPeer::ReceivedData> data_;
40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
41 41
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData); 42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
43 }; 43 };
44 44
45 } // namespace 45 } // namespace
46 46
47 using Result = blink::WebDataConsumerHandle::Result; 47 using Result = blink::WebDataConsumerHandle::Result;
48 48
49 // All methods (except for ctor/dtor) must be called with |lock_| aquired
50 // unless otherwise stated.
49 class SharedMemoryDataConsumerHandle::Context final 51 class SharedMemoryDataConsumerHandle::Context final
50 : public base::RefCountedThreadSafe<Context> { 52 : public base::RefCountedThreadSafe<Context> {
51 public: 53 public:
52 explicit Context(const base::Closure& on_reader_detached) 54 explicit Context(const base::Closure& on_reader_detached)
53 : result_(Ok), 55 : result_(Ok),
54 first_offset_(0), 56 first_offset_(0),
55 client_(nullptr), 57 client_(nullptr),
56 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()), 58 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()),
57 on_reader_detached_(on_reader_detached), 59 on_reader_detached_(on_reader_detached),
58 is_on_reader_detached_valid_(!on_reader_detached_.is_null()), 60 is_on_reader_detached_valid_(!on_reader_detached_.is_null()),
59 is_handle_active_(true), 61 is_handle_active_(true),
60 is_two_phase_read_in_progress_(false) {} 62 is_two_phase_read_in_progress_(false) {}
61 63
62 bool IsEmpty() const { return queue_.empty(); } 64 bool IsEmpty() const {
65 lock_.AssertAcquired();
66 return queue_.empty();
67 }
63 void ClearIfNecessary() { 68 void ClearIfNecessary() {
69 lock_.AssertAcquired();
64 if (!is_handle_locked() && !is_handle_active()) { 70 if (!is_handle_locked() && !is_handle_active()) {
65 // No one is interested in the contents. 71 // No one is interested in the contents.
66 if (is_on_reader_detached_valid_) { 72 if (is_on_reader_detached_valid_) {
67 // We post a task even in the writer thread in order to avoid a 73 // We post a task even in the writer thread in order to avoid a
68 // reentrance problem as calling |on_reader_detached_| may manipulate 74 // reentrance problem as calling |on_reader_detached_| may manipulate
69 // the context synchronously. 75 // the context synchronously.
70 writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_); 76 writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_);
71 } 77 }
72 Clear(); 78 Clear();
73 } 79 }
74 } 80 }
75 void ClearQueue() { 81 void ClearQueue() {
82 lock_.AssertAcquired();
76 for (auto& data : queue_) { 83 for (auto& data : queue_) {
77 delete data; 84 delete data;
78 } 85 }
79 queue_.clear(); 86 queue_.clear();
80 first_offset_ = 0; 87 first_offset_ = 0;
81 } 88 }
82 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } 89 RequestPeer::ThreadSafeReceivedData* Top() {
90 lock_.AssertAcquired();
91 return queue_.front();
92 }
83 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { 93 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
94 lock_.AssertAcquired();
84 queue_.push_back(data.release()); 95 queue_.push_back(data.release());
85 } 96 }
86 size_t first_offset() const { return first_offset_; } 97 size_t first_offset() const {
87 Result result() const { return result_; } 98 lock_.AssertAcquired();
88 void set_result(Result r) { result_ = r; } 99 return first_offset_;
100 }
101 Result result() const {
102 lock_.AssertAcquired();
103 return result_;
104 }
105 void set_result(Result r) {
106 lock_.AssertAcquired();
107 result_ = r;
108 }
89 void AcquireReaderLock(Client* client) { 109 void AcquireReaderLock(Client* client) {
110 lock_.AssertAcquired();
90 DCHECK(!notification_task_runner_); 111 DCHECK(!notification_task_runner_);
91 DCHECK(!client_); 112 DCHECK(!client_);
92 notification_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 113 notification_task_runner_ = base::ThreadTaskRunnerHandle::Get();
93 client_ = client; 114 client_ = client;
94 if (client && !(IsEmpty() && result() == Ok)) { 115 if (client && !(IsEmpty() && result() == Ok)) {
95 // We cannot notify synchronously because the user doesn't have the reader 116 // We cannot notify synchronously because the user doesn't have the reader
96 // yet. 117 // yet.
97 notification_task_runner_->PostTask( 118 notification_task_runner_->PostTask(
98 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false)); 119 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
99 } 120 }
100 } 121 }
101 void ReleaseReaderLock() { 122 void ReleaseReaderLock() {
123 lock_.AssertAcquired();
102 DCHECK(notification_task_runner_); 124 DCHECK(notification_task_runner_);
103 notification_task_runner_ = nullptr; 125 notification_task_runner_ = nullptr;
104 client_ = nullptr; 126 client_ = nullptr;
105 } 127 }
106 void PostNotify() { 128 void PostNotify() {
129 lock_.AssertAcquired();
107 auto runner = notification_task_runner_; 130 auto runner = notification_task_runner_;
108 if (!runner) 131 if (!runner)
109 return; 132 return;
110 // We don't re-post the task when the runner changes while waiting for 133 // We don't re-post the task when the runner changes while waiting for
111 // this task because in this case a new reader is obtained and 134 // this task because in this case a new reader is obtained and
112 // notification is already done at the reader creation time if necessary. 135 // notification is already done at the reader creation time if necessary.
113 runner->PostTask(FROM_HERE, 136 runner->PostTask(FROM_HERE,
114 base::Bind(&Context::NotifyInternal, this, false)); 137 base::Bind(&Context::NotifyInternal, this, false));
115 } 138 }
139 // Must be called with |lock_| not aquired.
116 void Notify() { NotifyInternal(true); } 140 void Notify() { NotifyInternal(true); }
117 // This function doesn't work in the destructor if |on_reader_detached_| is 141 // This function doesn't work in the destructor if |on_reader_detached_| is
118 // not null. 142 // not null.
119 void ResetOnReaderDetached() { 143 void ResetOnReaderDetached() {
144 lock_.AssertAcquired();
120 if (on_reader_detached_.is_null()) { 145 if (on_reader_detached_.is_null()) {
121 DCHECK(!is_on_reader_detached_valid_); 146 DCHECK(!is_on_reader_detached_valid_);
122 return; 147 return;
123 } 148 }
124 is_on_reader_detached_valid_ = false; 149 is_on_reader_detached_valid_ = false;
125 if (writer_task_runner_->BelongsToCurrentThread()) { 150 if (writer_task_runner_->BelongsToCurrentThread()) {
126 // We can reset the closure immediately. 151 // We can reset the closure immediately.
127 on_reader_detached_.Reset(); 152 on_reader_detached_.Reset();
128 } else { 153 } else {
129 // We need to reset |on_reader_detached_| on the right thread because it 154 // We need to reset |on_reader_detached_| on the right thread because it
130 // might lead to the object destruction. 155 // might lead to the object destruction.
131 writer_task_runner_->PostTask( 156 writer_task_runner_->PostTask(
132 FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this)); 157 FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this));
133 } 158 }
134 } 159 }
135 bool is_handle_locked() const { return notification_task_runner_; } 160 bool is_handle_locked() const {
161 lock_.AssertAcquired();
162 return notification_task_runner_;
163 }
136 bool IsReaderBoundToCurrentThread() const { 164 bool IsReaderBoundToCurrentThread() const {
165 lock_.AssertAcquired();
137 return notification_task_runner_ && 166 return notification_task_runner_ &&
138 notification_task_runner_->BelongsToCurrentThread(); 167 notification_task_runner_->BelongsToCurrentThread();
139 } 168 }
140 bool is_handle_active() const { return is_handle_active_; } 169 bool is_handle_active() const {
141 void set_is_handle_active(bool b) { is_handle_active_ = b; } 170 lock_.AssertAcquired();
171 return is_handle_active_;
172 }
173 void set_is_handle_active(bool b) {
174 lock_.AssertAcquired();
175 is_handle_active_ = b;
176 }
142 void Consume(size_t s) { 177 void Consume(size_t s) {
178 lock_.AssertAcquired();
143 first_offset_ += s; 179 first_offset_ += s;
144 auto top = Top(); 180 auto top = Top();
145 if (static_cast<size_t>(top->length()) <= first_offset_) { 181 if (static_cast<size_t>(top->length()) <= first_offset_) {
146 delete top; 182 delete top;
147 queue_.pop_front(); 183 queue_.pop_front();
148 first_offset_ = 0; 184 first_offset_ = 0;
149 } 185 }
150 } 186 }
151 bool is_two_phase_read_in_progress() const { 187 bool is_two_phase_read_in_progress() const {
188 lock_.AssertAcquired();
152 return is_two_phase_read_in_progress_; 189 return is_two_phase_read_in_progress_;
153 } 190 }
154 void set_is_two_phase_read_in_progress(bool b) { 191 void set_is_two_phase_read_in_progress(bool b) {
192 lock_.AssertAcquired();
155 is_two_phase_read_in_progress_ = b; 193 is_two_phase_read_in_progress_ = b;
156 } 194 }
195 // Can be called with |lock_| not aquired.
157 base::Lock& lock() { return lock_; } 196 base::Lock& lock() { return lock_; }
158 197
159 private: 198 private:
199 // Must be called with |lock_| not aquired.
160 void NotifyInternal(bool repost) { 200 void NotifyInternal(bool repost) {
161 // Note that this function is not protected by |lock_|. 201 scoped_refptr<base::SingleThreadTaskRunner> runner;
162 202 {
163 auto runner = notification_task_runner_; 203 base::AutoLock lock(lock_);
204 runner = notification_task_runner_;
205 }
164 if (!runner) 206 if (!runner)
165 return; 207 return;
166 208
167 if (runner->BelongsToCurrentThread()) { 209 if (runner->BelongsToCurrentThread()) {
168 // It is safe to access member variables without lock because |client_| 210 // It is safe to access member variables without lock because |client_|
169 // is bound to the current thread. 211 // is bound to the current thread.
170 if (client_) 212 if (client_)
171 client_->didGetReadable(); 213 client_->didGetReadable();
172 return; 214 return;
173 } 215 }
174 if (repost) { 216 if (repost) {
175 // We don't re-post the task when the runner changes while waiting for 217 // We don't re-post the task when the runner changes while waiting for
176 // this task because in this case a new reader is obtained and 218 // this task because in this case a new reader is obtained and
177 // notification is already done at the reader creation time if necessary. 219 // notification is already done at the reader creation time if necessary.
178 runner->PostTask(FROM_HERE, 220 runner->PostTask(FROM_HERE,
179 base::Bind(&Context::NotifyInternal, this, false)); 221 base::Bind(&Context::NotifyInternal, this, false));
180 } 222 }
181 } 223 }
182 void Clear() { 224 void Clear() {
225 lock_.AssertAcquired();
183 for (auto& data : queue_) { 226 for (auto& data : queue_) {
184 delete data; 227 delete data;
185 } 228 }
186 queue_.clear(); 229 queue_.clear();
187 first_offset_ = 0; 230 first_offset_ = 0;
188 client_ = nullptr; 231 client_ = nullptr;
189 // Note this doesn't work in the destructor if |on_reader_detached_| is not 232 // Note this doesn't work in the destructor if |on_reader_detached_| is not
190 // null. We have an assert in the destructor. 233 // null. We have an assert in the destructor.
191 ResetOnReaderDetached(); 234 ResetOnReaderDetached();
192 } 235 }
236 // Must be called with |lock_| not aquired.
193 void ResetOnReaderDetachedWithLock() { 237 void ResetOnReaderDetachedWithLock() {
194 base::AutoLock lock(lock_); 238 base::AutoLock lock(lock_);
195 ResetOnReaderDetached(); 239 ResetOnReaderDetached();
196 } 240 }
197 241
198 friend class base::RefCountedThreadSafe<Context>; 242 friend class base::RefCountedThreadSafe<Context>;
199 ~Context() { 243 ~Context() {
244 base::AutoLock lock(lock_);
200 DCHECK(on_reader_detached_.is_null()); 245 DCHECK(on_reader_detached_.is_null());
201 246
202 // This is necessary because the queue stores raw pointers. 247 // This is necessary because the queue stores raw pointers.
203 Clear(); 248 Clear();
204 } 249 }
205 250
206 base::Lock lock_; 251 base::Lock lock_;
207 // |result_| stores the ultimate state of this handle if it has. Otherwise, 252 // |result_| stores the ultimate state of this handle if it has. Otherwise,
208 // |Ok| is set. 253 // |Ok| is set.
209 Result result_; 254 Result result_;
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
266 311
267 if (needs_notification) { 312 if (needs_notification) {
268 // We CAN issue the notification synchronously if the associated reader 313 // We CAN issue the notification synchronously if the associated reader
269 // lives in this thread, because this function cannot be called in the 314 // lives in this thread, because this function cannot be called in the
270 // client's callback. 315 // client's callback.
271 context_->Notify(); 316 context_->Notify();
272 } 317 }
273 } 318 }
274 319
275 void SharedMemoryDataConsumerHandle::Writer::Close() { 320 void SharedMemoryDataConsumerHandle::Writer::Close() {
276 bool needs_notification = false; 321 base::AutoLock lock(context_->lock());
277 322 if (context_->result() == Ok) {
278 { 323 context_->set_result(Done);
279 base::AutoLock lock(context_->lock()); 324 context_->ResetOnReaderDetached();
280 if (context_->result() == Ok) { 325 if (context_->IsEmpty()) {
281 context_->set_result(Done); 326 // We cannot issue the notification synchronously because this function
282 context_->ResetOnReaderDetached(); 327 // can be called in the client's callback.
283 needs_notification = context_->IsEmpty(); 328 context_->PostNotify();
284 } 329 }
285 } 330 }
286 if (needs_notification) {
287 // We cannot issue the notification synchronously because this function can
288 // be called in the client's callback.
289 context_->PostNotify();
290 }
291 } 331 }
292 332
293 void SharedMemoryDataConsumerHandle::Writer::Fail() { 333 void SharedMemoryDataConsumerHandle::Writer::Fail() {
294 bool needs_notification = false; 334 base::AutoLock lock(context_->lock());
295 { 335 if (context_->result() == Ok) {
296 base::AutoLock lock(context_->lock()); 336 // TODO(yhirano): Use an appropriate error code other than
297 if (context_->result() == Ok) { 337 // UnexpectedError.
298 // TODO(yhirano): Use an appropriate error code other than 338 context_->set_result(UnexpectedError);
299 // UnexpectedError.
300 context_->set_result(UnexpectedError);
301 339
302 if (context_->is_two_phase_read_in_progress()) { 340 if (context_->is_two_phase_read_in_progress()) {
303 // If we are in two-phase read session, we cannot discard the data. We 341 // If we are in two-phase read session, we cannot discard the data. We
304 // will clear the queue at the end of the session. 342 // will clear the queue at the end of the session.
305 } else { 343 } else {
306 context_->ClearQueue(); 344 context_->ClearQueue();
307 } 345 }
308 346
309 context_->ResetOnReaderDetached(); 347 context_->ResetOnReaderDetached();
310 needs_notification = true;
311 }
312 }
313 if (needs_notification) {
314 // We cannot issue the notification synchronously because this function can 348 // We cannot issue the notification synchronously because this function can
315 // be called in the client's callback. 349 // be called in the client's callback.
316 context_->PostNotify(); 350 context_->PostNotify();
317 } 351 }
318 } 352 }
319 353
320 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl( 354 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
321 scoped_refptr<Context> context, 355 scoped_refptr<Context> context,
322 Client* client) 356 Client* client)
323 : context_(context) { 357 : context_(context) {
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after
438 SharedMemoryDataConsumerHandle::ReaderImpl* 472 SharedMemoryDataConsumerHandle::ReaderImpl*
439 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) { 473 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
440 return new ReaderImpl(context_, client); 474 return new ReaderImpl(context_, client);
441 } 475 }
442 476
443 const char* SharedMemoryDataConsumerHandle::debugName() const { 477 const char* SharedMemoryDataConsumerHandle::debugName() const {
444 return "SharedMemoryDataConsumerHandle"; 478 return "SharedMemoryDataConsumerHandle";
445 } 479 }
446 480
447 } // namespace content 481 } // namespace content
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698