Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1186053004: Cancel loading when body stream reader is detached. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@error-ipc-data-consumer
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/shared_memory_data_consumer_handle.h" 5 #include "content/child/shared_memory_data_consumer_handle.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <deque> 8 #include <deque>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
12 #include "base/single_thread_task_runner.h" 12 #include "base/single_thread_task_runner.h"
13 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
14 #include "base/thread_task_runner_handle.h"
14 #include "content/public/child/fixed_received_data.h" 15 #include "content/public/child/fixed_received_data.h"
15 16
16 namespace content { 17 namespace content {
17 18
18 namespace { 19 namespace {
19 20
20 class DelegateThreadSafeReceivedData final 21 class DelegateThreadSafeReceivedData final
21 : public RequestPeer::ThreadSafeReceivedData { 22 : public RequestPeer::ThreadSafeReceivedData {
22 public: 23 public:
23 explicit DelegateThreadSafeReceivedData( 24 explicit DelegateThreadSafeReceivedData(
24 scoped_ptr<RequestPeer::ReceivedData> data) 25 scoped_ptr<RequestPeer::ReceivedData> data)
25 : data_(data.Pass()), 26 : data_(data.Pass()), task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
26 task_runner_(base::MessageLoop::current()->task_runner()) {}
27 ~DelegateThreadSafeReceivedData() override { 27 ~DelegateThreadSafeReceivedData() override {
28 if (!task_runner_->BelongsToCurrentThread()) { 28 if (!task_runner_->BelongsToCurrentThread()) {
29 // Delete the data on the original thread. 29 // Delete the data on the original thread.
30 task_runner_->DeleteSoon(FROM_HERE, data_.release()); 30 task_runner_->DeleteSoon(FROM_HERE, data_.release());
31 } 31 }
32 } 32 }
33 33
34 const char* payload() const override { return data_->payload(); } 34 const char* payload() const override { return data_->payload(); }
35 int length() const override { return data_->length(); } 35 int length() const override { return data_->length(); }
36 int encoded_length() const override { return data_->encoded_length(); } 36 int encoded_length() const override { return data_->encoded_length(); }
37 37
38 private: 38 private:
39 scoped_ptr<RequestPeer::ReceivedData> data_; 39 scoped_ptr<RequestPeer::ReceivedData> data_;
40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
41 41
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData); 42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
43 }; 43 };
44 44
45 } // namespace 45 } // namespace
46 46
47 using Result = blink::WebDataConsumerHandle::Result; 47 using Result = blink::WebDataConsumerHandle::Result;
48 48
49 class SharedMemoryDataConsumerHandle::Context final 49 class SharedMemoryDataConsumerHandle::Context final
50 : public base::RefCountedThreadSafe<Context> { 50 : public base::RefCountedThreadSafe<Context> {
51 public: 51 public:
52 Context() 52 explicit Context(const base::Closure& on_reader_detached)
53 : result_(Ok), 53 : result_(Ok),
54 first_offset_(0), 54 first_offset_(0),
55 client_(nullptr), 55 client_(nullptr),
56 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()),
57 on_reader_detached_(on_reader_detached),
58 is_on_reader_detached_valid_(!on_reader_detached_.is_null()),
56 is_handle_active_(true), 59 is_handle_active_(true),
57 is_two_phase_read_in_progress_(false) {} 60 is_two_phase_read_in_progress_(false) {}
58 61
59 bool IsEmpty() const { return queue_.empty(); } 62 bool IsEmpty() const { return queue_.empty(); }
60 void ClearIfNecessary() { 63 void ClearIfNecessary() {
61 if (!is_handle_locked() && !is_handle_active()) { 64 if (!is_handle_locked() && !is_handle_active()) {
62 // No one is interested in the contents. 65 // No one is interested in the contents.
66 if (is_on_reader_detached_valid_) {
67 // We post a task even in the writer thread in order to avoid a
68 // reentrance problem as calling |on_reader_detached_| may manipulate
69 // the context synchronously.
70 writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_);
71 }
63 Clear(); 72 Clear();
64 } 73 }
65 } 74 }
66 void ClearQueue() { 75 void ClearQueue() {
67 for (auto& data : queue_) { 76 for (auto& data : queue_) {
68 delete data; 77 delete data;
69 } 78 }
70 queue_.clear(); 79 queue_.clear();
71 first_offset_ = 0; 80 first_offset_ = 0;
72 } 81 }
73 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } 82 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
74 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { 83 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
75 queue_.push_back(data.release()); 84 queue_.push_back(data.release());
76 } 85 }
77 size_t first_offset() const { return first_offset_; } 86 size_t first_offset() const { return first_offset_; }
78 Result result() const { return result_; } 87 Result result() const { return result_; }
79 void set_result(Result r) { result_ = r; } 88 void set_result(Result r) { result_ = r; }
80 void AcquireReaderLock(Client* client) { 89 void AcquireReaderLock(Client* client) {
81 DCHECK(!notification_task_runner_); 90 DCHECK(!notification_task_runner_);
82 DCHECK(!client_); 91 DCHECK(!client_);
83 notification_task_runner_ = base::MessageLoop::current()->task_runner(); 92 notification_task_runner_ = base::ThreadTaskRunnerHandle::Get();
84 client_ = client; 93 client_ = client;
85 if (client && !(IsEmpty() && result() == Ok)) { 94 if (client && !(IsEmpty() && result() == Ok)) {
86 // We cannot notify synchronously because the user doesn't have the reader 95 // We cannot notify synchronously because the user doesn't have the reader
87 // yet. 96 // yet.
88 notification_task_runner_->PostTask( 97 notification_task_runner_->PostTask(
89 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false)); 98 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
90 } 99 }
91 } 100 }
92 void ReleaseReaderLock() { 101 void ReleaseReaderLock() {
93 DCHECK(notification_task_runner_); 102 DCHECK(notification_task_runner_);
94 notification_task_runner_ = nullptr; 103 notification_task_runner_ = nullptr;
95 client_ = nullptr; 104 client_ = nullptr;
96 } 105 }
97 void PostNotify() { 106 void PostNotify() {
98 auto runner = notification_task_runner_; 107 auto runner = notification_task_runner_;
99 if (!runner) 108 if (!runner)
100 return; 109 return;
101 // We don't re-post the task when the runner changes while waiting for 110 // We don't re-post the task when the runner changes while waiting for
102 // this task because in this case a new reader is obtained and 111 // this task because in this case a new reader is obtained and
103 // notification is already done at the reader creation time if necessary. 112 // notification is already done at the reader creation time if necessary.
104 runner->PostTask(FROM_HERE, 113 runner->PostTask(FROM_HERE,
105 base::Bind(&Context::NotifyInternal, this, false)); 114 base::Bind(&Context::NotifyInternal, this, false));
106 } 115 }
107 void Notify() { NotifyInternal(true); } 116 void Notify() { NotifyInternal(true); }
117 // This function doesn't work in the destructor if |on_reader_detached_| is
118 // not null.
119 void ResetOnReaderDetached() {
120 if (on_reader_detached_.is_null()) {
121 DCHECK(!is_on_reader_detached_valid_);
122 return;
123 }
124 is_on_reader_detached_valid_ = false;
125 if (writer_task_runner_->BelongsToCurrentThread()) {
126 // We can reset the closure immediately.
127 on_reader_detached_.Reset();
128 } else {
129 // We need to reset |on_reader_detached_| on the right thread because it
130 // might lead to the object destruction.
131 writer_task_runner_->PostTask(
132 FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this));
133 }
134 }
108 bool is_handle_locked() const { return notification_task_runner_; } 135 bool is_handle_locked() const { return notification_task_runner_; }
109 bool IsReaderBoundToCurrentThread() const { 136 bool IsReaderBoundToCurrentThread() const {
110 return notification_task_runner_ && 137 return notification_task_runner_ &&
111 notification_task_runner_->BelongsToCurrentThread(); 138 notification_task_runner_->BelongsToCurrentThread();
112 } 139 }
113 bool is_handle_active() const { return is_handle_active_; } 140 bool is_handle_active() const { return is_handle_active_; }
114 void set_is_handle_active(bool b) { is_handle_active_ = b; } 141 void set_is_handle_active(bool b) { is_handle_active_ = b; }
115 void Consume(size_t s) { 142 void Consume(size_t s) {
116 first_offset_ += s; 143 first_offset_ += s;
117 auto top = Top(); 144 auto top = Top();
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
152 base::Bind(&Context::NotifyInternal, this, false)); 179 base::Bind(&Context::NotifyInternal, this, false));
153 } 180 }
154 } 181 }
155 void Clear() { 182 void Clear() {
156 for (auto& data : queue_) { 183 for (auto& data : queue_) {
157 delete data; 184 delete data;
158 } 185 }
159 queue_.clear(); 186 queue_.clear();
160 first_offset_ = 0; 187 first_offset_ = 0;
161 client_ = nullptr; 188 client_ = nullptr;
189 // Note this doesn't work in the destructor if |on_reader_detached_| is not
190 // null. We have an assert in the destructor.
191 ResetOnReaderDetached();
192 }
193 void ResetOnReaderDetachedWithLock() {
194 base::AutoLock lock(lock_);
195 ResetOnReaderDetached();
162 } 196 }
163 197
164 friend class base::RefCountedThreadSafe<Context>; 198 friend class base::RefCountedThreadSafe<Context>;
165 ~Context() { 199 ~Context() {
200 DCHECK(on_reader_detached_.is_null());
201
166 // This is necessary because the queue stores raw pointers. 202 // This is necessary because the queue stores raw pointers.
167 Clear(); 203 Clear();
168 } 204 }
169 205
170 base::Lock lock_; 206 base::Lock lock_;
171 // |result_| stores the ultimate state of this handle if it has. Otherwise, 207 // |result_| stores the ultimate state of this handle if it has. Otherwise,
172 // |Ok| is set. 208 // |Ok| is set.
173 Result result_; 209 Result result_;
174 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> 210 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
175 // once it is allowed. 211 // once it is allowed.
176 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; 212 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
177 size_t first_offset_; 213 size_t first_offset_;
178 Client* client_; 214 Client* client_;
179 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; 215 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
216 scoped_refptr<base::SingleThreadTaskRunner> writer_task_runner_;
217 base::Closure on_reader_detached_;
218 // We need this boolean variable to remember if |on_reader_detached_| is
219 // callable because we need to reset |on_reader_detached_| only on the writer
220 // thread and hence |on_reader_detached_.is_null()| is untrustworthy on
221 // other threads.
222 bool is_on_reader_detached_valid_;
180 bool is_handle_active_; 223 bool is_handle_active_;
181 bool is_two_phase_read_in_progress_; 224 bool is_two_phase_read_in_progress_;
182 225
183 DISALLOW_COPY_AND_ASSIGN(Context); 226 DISALLOW_COPY_AND_ASSIGN(Context);
184 }; 227 };
185 228
186 SharedMemoryDataConsumerHandle::Writer::Writer( 229 SharedMemoryDataConsumerHandle::Writer::Writer(
187 const scoped_refptr<Context>& context, 230 const scoped_refptr<Context>& context,
188 BackpressureMode mode) 231 BackpressureMode mode)
189 : context_(context), mode_(mode) { 232 : context_(context), mode_(mode) {
190 } 233 }
191 234
192 SharedMemoryDataConsumerHandle::Writer::~Writer() { 235 SharedMemoryDataConsumerHandle::Writer::~Writer() {
193 Close(); 236 Close();
237 base::AutoLock lock(context_->lock());
238 context_->ResetOnReaderDetached();
194 } 239 }
195 240
196 void SharedMemoryDataConsumerHandle::Writer::AddData( 241 void SharedMemoryDataConsumerHandle::Writer::AddData(
197 scoped_ptr<RequestPeer::ReceivedData> data) { 242 scoped_ptr<RequestPeer::ReceivedData> data) {
198 if (!data->length()) { 243 if (!data->length()) {
199 // We omit empty data. 244 // We omit empty data.
200 return; 245 return;
201 } 246 }
202 247
203 bool needs_notification = false; 248 bool needs_notification = false;
(...skipping 23 matching lines...) Expand all
227 } 272 }
228 } 273 }
229 274
230 void SharedMemoryDataConsumerHandle::Writer::Close() { 275 void SharedMemoryDataConsumerHandle::Writer::Close() {
231 bool needs_notification = false; 276 bool needs_notification = false;
232 277
233 { 278 {
234 base::AutoLock lock(context_->lock()); 279 base::AutoLock lock(context_->lock());
235 if (context_->result() == Ok) { 280 if (context_->result() == Ok) {
236 context_->set_result(Done); 281 context_->set_result(Done);
282 context_->ResetOnReaderDetached();
237 needs_notification = context_->IsEmpty(); 283 needs_notification = context_->IsEmpty();
238 } 284 }
239 } 285 }
240 if (needs_notification) { 286 if (needs_notification) {
241 // We cannot issue the notification synchronously because this function can 287 // We cannot issue the notification synchronously because this function can
242 // be called in the client's callback. 288 // be called in the client's callback.
243 context_->PostNotify(); 289 context_->PostNotify();
244 } 290 }
245 } 291 }
246 292
247 void SharedMemoryDataConsumerHandle::Writer::Fail() { 293 void SharedMemoryDataConsumerHandle::Writer::Fail() {
248 bool needs_notification = false; 294 bool needs_notification = false;
249 { 295 {
250 base::AutoLock lock(context_->lock()); 296 base::AutoLock lock(context_->lock());
251 if (context_->result() == Ok) { 297 if (context_->result() == Ok) {
252 // TODO(yhirano): Use an appropriate error code other than 298 // TODO(yhirano): Use an appropriate error code other than
253 // UnexpectedError. 299 // UnexpectedError.
254 context_->set_result(UnexpectedError); 300 context_->set_result(UnexpectedError);
255 301
256 if (context_->is_two_phase_read_in_progress()) { 302 if (context_->is_two_phase_read_in_progress()) {
257 // If we are in two-phase read session, we cannot discard the data. We 303 // If we are in two-phase read session, we cannot discard the data. We
258 // will clear the queue at the end of the session. 304 // will clear the queue at the end of the session.
259 } else { 305 } else {
260 context_->ClearQueue(); 306 context_->ClearQueue();
261 } 307 }
262 308
309 context_->ResetOnReaderDetached();
263 needs_notification = true; 310 needs_notification = true;
264 } 311 }
265 } 312 }
266 if (needs_notification) { 313 if (needs_notification) {
267 // We cannot issue the notification synchronously because this function can 314 // We cannot issue the notification synchronously because this function can
268 // be called in the client's callback. 315 // be called in the client's callback.
269 context_->PostNotify(); 316 context_->PostNotify();
270 } 317 }
271 } 318 }
272 319
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 } else { 402 } else {
356 context_->Consume(read_size); 403 context_->Consume(read_size);
357 } 404 }
358 405
359 return Ok; 406 return Ok;
360 } 407 }
361 408
362 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( 409 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
363 BackpressureMode mode, 410 BackpressureMode mode,
364 scoped_ptr<Writer>* writer) 411 scoped_ptr<Writer>* writer)
365 : context_(new Context) { 412 : SharedMemoryDataConsumerHandle(mode, base::Closure(), writer) {
413 }
414
415 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
416 BackpressureMode mode,
417 const base::Closure& on_reader_detached,
418 scoped_ptr<Writer>* writer)
419 : context_(new Context(on_reader_detached)) {
366 writer->reset(new Writer(context_, mode)); 420 writer->reset(new Writer(context_, mode));
367 } 421 }
368 422
369 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { 423 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
370 base::AutoLock lock(context_->lock()); 424 base::AutoLock lock(context_->lock());
371 context_->set_is_handle_active(false); 425 context_->set_is_handle_active(false);
372 context_->ClearIfNecessary(); 426 context_->ClearIfNecessary();
373 } 427 }
374 428
375 scoped_ptr<blink::WebDataConsumerHandle::Reader> 429 scoped_ptr<blink::WebDataConsumerHandle::Reader>
376 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) { 430 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) {
377 return make_scoped_ptr(obtainReaderInternal(client)); 431 return make_scoped_ptr(obtainReaderInternal(client));
378 } 432 }
379 433
380 SharedMemoryDataConsumerHandle::ReaderImpl* 434 SharedMemoryDataConsumerHandle::ReaderImpl*
381 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) { 435 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
382 return new ReaderImpl(context_, client); 436 return new ReaderImpl(context_, client);
383 } 437 }
384 438
385 const char* SharedMemoryDataConsumerHandle::debugName() const { 439 const char* SharedMemoryDataConsumerHandle::debugName() const {
386 return "SharedMemoryDataConsumerHandle"; 440 return "SharedMemoryDataConsumerHandle";
387 } 441 }
388 442
389 } // namespace content 443 } // namespace content
OLDNEW
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698