| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/child/shared_memory_data_consumer_handle.h" | 5 #include "content/child/shared_memory_data_consumer_handle.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <deque> | 8 #include <deque> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| 11 #include "base/message_loop/message_loop.h" | 11 #include "base/message_loop/message_loop.h" |
| 12 #include "base/single_thread_task_runner.h" | 12 #include "base/single_thread_task_runner.h" |
| 13 #include "base/synchronization/lock.h" | 13 #include "base/synchronization/lock.h" |
| 14 #include "base/thread_task_runner_handle.h" |
| 14 #include "content/public/child/fixed_received_data.h" | 15 #include "content/public/child/fixed_received_data.h" |
| 15 | 16 |
| 16 namespace content { | 17 namespace content { |
| 17 | 18 |
| 18 namespace { | 19 namespace { |
| 19 | 20 |
| 20 class DelegateThreadSafeReceivedData final | 21 class DelegateThreadSafeReceivedData final |
| 21 : public RequestPeer::ThreadSafeReceivedData { | 22 : public RequestPeer::ThreadSafeReceivedData { |
| 22 public: | 23 public: |
| 23 explicit DelegateThreadSafeReceivedData( | 24 explicit DelegateThreadSafeReceivedData( |
| 24 scoped_ptr<RequestPeer::ReceivedData> data) | 25 scoped_ptr<RequestPeer::ReceivedData> data) |
| 25 : data_(data.Pass()), | 26 : data_(data.Pass()), task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
| 26 task_runner_(base::MessageLoop::current()->task_runner()) {} | |
| 27 ~DelegateThreadSafeReceivedData() override { | 27 ~DelegateThreadSafeReceivedData() override { |
| 28 if (!task_runner_->BelongsToCurrentThread()) { | 28 if (!task_runner_->BelongsToCurrentThread()) { |
| 29 // Delete the data on the original thread. | 29 // Delete the data on the original thread. |
| 30 task_runner_->DeleteSoon(FROM_HERE, data_.release()); | 30 task_runner_->DeleteSoon(FROM_HERE, data_.release()); |
| 31 } | 31 } |
| 32 } | 32 } |
| 33 | 33 |
| 34 const char* payload() const override { return data_->payload(); } | 34 const char* payload() const override { return data_->payload(); } |
| 35 int length() const override { return data_->length(); } | 35 int length() const override { return data_->length(); } |
| 36 int encoded_length() const override { return data_->encoded_length(); } | 36 int encoded_length() const override { return data_->encoded_length(); } |
| 37 | 37 |
| 38 private: | 38 private: |
| 39 scoped_ptr<RequestPeer::ReceivedData> data_; | 39 scoped_ptr<RequestPeer::ReceivedData> data_; |
| 40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 41 | 41 |
| 42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData); | 42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData); |
| 43 }; | 43 }; |
| 44 | 44 |
| 45 } // namespace | 45 } // namespace |
| 46 | 46 |
| 47 using Result = blink::WebDataConsumerHandle::Result; | 47 using Result = blink::WebDataConsumerHandle::Result; |
| 48 | 48 |
| 49 class SharedMemoryDataConsumerHandle::Context final | 49 class SharedMemoryDataConsumerHandle::Context final |
| 50 : public base::RefCountedThreadSafe<Context> { | 50 : public base::RefCountedThreadSafe<Context> { |
| 51 public: | 51 public: |
| 52 Context() | 52 explicit Context(const base::Closure& on_reader_detached) |
| 53 : result_(Ok), | 53 : result_(Ok), |
| 54 first_offset_(0), | 54 first_offset_(0), |
| 55 client_(nullptr), | 55 client_(nullptr), |
| 56 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 57 on_reader_detached_(on_reader_detached), |
| 58 is_on_reader_detached_valid_(!on_reader_detached_.is_null()), |
| 56 is_handle_active_(true), | 59 is_handle_active_(true), |
| 57 is_two_phase_read_in_progress_(false) {} | 60 is_two_phase_read_in_progress_(false) {} |
| 58 | 61 |
| 59 bool IsEmpty() const { return queue_.empty(); } | 62 bool IsEmpty() const { return queue_.empty(); } |
| 60 void ClearIfNecessary() { | 63 void ClearIfNecessary() { |
| 61 if (!is_handle_locked() && !is_handle_active()) { | 64 if (!is_handle_locked() && !is_handle_active()) { |
| 62 // No one is interested in the contents. | 65 // No one is interested in the contents. |
| 66 if (is_on_reader_detached_valid_) { |
| 67 // We post a task even in the writer thread in order to avoid a |
| 68 // reentrance problem as calling |on_reader_detached_| may manipulate |
| 69 // the context synchronously. |
| 70 writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_); |
| 71 } |
| 63 Clear(); | 72 Clear(); |
| 64 } | 73 } |
| 65 } | 74 } |
| 66 void ClearQueue() { | 75 void ClearQueue() { |
| 67 for (auto& data : queue_) { | 76 for (auto& data : queue_) { |
| 68 delete data; | 77 delete data; |
| 69 } | 78 } |
| 70 queue_.clear(); | 79 queue_.clear(); |
| 71 first_offset_ = 0; | 80 first_offset_ = 0; |
| 72 } | 81 } |
| 73 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } | 82 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } |
| 74 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { | 83 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { |
| 75 queue_.push_back(data.release()); | 84 queue_.push_back(data.release()); |
| 76 } | 85 } |
| 77 size_t first_offset() const { return first_offset_; } | 86 size_t first_offset() const { return first_offset_; } |
| 78 Result result() const { return result_; } | 87 Result result() const { return result_; } |
| 79 void set_result(Result r) { result_ = r; } | 88 void set_result(Result r) { result_ = r; } |
| 80 void AcquireReaderLock(Client* client) { | 89 void AcquireReaderLock(Client* client) { |
| 81 DCHECK(!notification_task_runner_); | 90 DCHECK(!notification_task_runner_); |
| 82 DCHECK(!client_); | 91 DCHECK(!client_); |
| 83 notification_task_runner_ = base::MessageLoop::current()->task_runner(); | 92 notification_task_runner_ = base::ThreadTaskRunnerHandle::Get(); |
| 84 client_ = client; | 93 client_ = client; |
| 85 if (client && !(IsEmpty() && result() == Ok)) { | 94 if (client && !(IsEmpty() && result() == Ok)) { |
| 86 // We cannot notify synchronously because the user doesn't have the reader | 95 // We cannot notify synchronously because the user doesn't have the reader |
| 87 // yet. | 96 // yet. |
| 88 notification_task_runner_->PostTask( | 97 notification_task_runner_->PostTask( |
| 89 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false)); | 98 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false)); |
| 90 } | 99 } |
| 91 } | 100 } |
| 92 void ReleaseReaderLock() { | 101 void ReleaseReaderLock() { |
| 93 DCHECK(notification_task_runner_); | 102 DCHECK(notification_task_runner_); |
| 94 notification_task_runner_ = nullptr; | 103 notification_task_runner_ = nullptr; |
| 95 client_ = nullptr; | 104 client_ = nullptr; |
| 96 } | 105 } |
| 97 void PostNotify() { | 106 void PostNotify() { |
| 98 auto runner = notification_task_runner_; | 107 auto runner = notification_task_runner_; |
| 99 if (!runner) | 108 if (!runner) |
| 100 return; | 109 return; |
| 101 // We don't re-post the task when the runner changes while waiting for | 110 // We don't re-post the task when the runner changes while waiting for |
| 102 // this task because in this case a new reader is obtained and | 111 // this task because in this case a new reader is obtained and |
| 103 // notification is already done at the reader creation time if necessary. | 112 // notification is already done at the reader creation time if necessary. |
| 104 runner->PostTask(FROM_HERE, | 113 runner->PostTask(FROM_HERE, |
| 105 base::Bind(&Context::NotifyInternal, this, false)); | 114 base::Bind(&Context::NotifyInternal, this, false)); |
| 106 } | 115 } |
| 107 void Notify() { NotifyInternal(true); } | 116 void Notify() { NotifyInternal(true); } |
| 117 // This function doesn't work in the destructor if |on_reader_detached_| is |
| 118 // not null. |
| 119 void ResetOnReaderDetached() { |
| 120 if (on_reader_detached_.is_null()) { |
| 121 DCHECK(!is_on_reader_detached_valid_); |
| 122 return; |
| 123 } |
| 124 is_on_reader_detached_valid_ = false; |
| 125 if (writer_task_runner_->BelongsToCurrentThread()) { |
| 126 // We can reset the closure immediately. |
| 127 on_reader_detached_.Reset(); |
| 128 } else { |
| 129 // We need to reset |on_reader_detached_| on the right thread because it |
| 130 // might lead to the object destruction. |
| 131 writer_task_runner_->PostTask( |
| 132 FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this)); |
| 133 } |
| 134 } |
| 108 bool is_handle_locked() const { return notification_task_runner_; } | 135 bool is_handle_locked() const { return notification_task_runner_; } |
| 109 bool IsReaderBoundToCurrentThread() const { | 136 bool IsReaderBoundToCurrentThread() const { |
| 110 return notification_task_runner_ && | 137 return notification_task_runner_ && |
| 111 notification_task_runner_->BelongsToCurrentThread(); | 138 notification_task_runner_->BelongsToCurrentThread(); |
| 112 } | 139 } |
| 113 bool is_handle_active() const { return is_handle_active_; } | 140 bool is_handle_active() const { return is_handle_active_; } |
| 114 void set_is_handle_active(bool b) { is_handle_active_ = b; } | 141 void set_is_handle_active(bool b) { is_handle_active_ = b; } |
| 115 void Consume(size_t s) { | 142 void Consume(size_t s) { |
| 116 first_offset_ += s; | 143 first_offset_ += s; |
| 117 auto top = Top(); | 144 auto top = Top(); |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 152 base::Bind(&Context::NotifyInternal, this, false)); | 179 base::Bind(&Context::NotifyInternal, this, false)); |
| 153 } | 180 } |
| 154 } | 181 } |
| 155 void Clear() { | 182 void Clear() { |
| 156 for (auto& data : queue_) { | 183 for (auto& data : queue_) { |
| 157 delete data; | 184 delete data; |
| 158 } | 185 } |
| 159 queue_.clear(); | 186 queue_.clear(); |
| 160 first_offset_ = 0; | 187 first_offset_ = 0; |
| 161 client_ = nullptr; | 188 client_ = nullptr; |
| 189 // Note this doesn't work in the destructor if |on_reader_detached_| is not |
| 190 // null. We have an assert in the destructor. |
| 191 ResetOnReaderDetached(); |
| 192 } |
| 193 void ResetOnReaderDetachedWithLock() { |
| 194 base::AutoLock lock(lock_); |
| 195 ResetOnReaderDetached(); |
| 162 } | 196 } |
| 163 | 197 |
| 164 friend class base::RefCountedThreadSafe<Context>; | 198 friend class base::RefCountedThreadSafe<Context>; |
| 165 ~Context() { | 199 ~Context() { |
| 200 DCHECK(on_reader_detached_.is_null()); |
| 201 |
| 166 // This is necessary because the queue stores raw pointers. | 202 // This is necessary because the queue stores raw pointers. |
| 167 Clear(); | 203 Clear(); |
| 168 } | 204 } |
| 169 | 205 |
| 170 base::Lock lock_; | 206 base::Lock lock_; |
| 171 // |result_| stores the ultimate state of this handle if it has. Otherwise, | 207 // |result_| stores the ultimate state of this handle if it has. Otherwise, |
| 172 // |Ok| is set. | 208 // |Ok| is set. |
| 173 Result result_; | 209 Result result_; |
| 174 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> | 210 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> |
| 175 // once it is allowed. | 211 // once it is allowed. |
| 176 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; | 212 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; |
| 177 size_t first_offset_; | 213 size_t first_offset_; |
| 178 Client* client_; | 214 Client* client_; |
| 179 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; | 215 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; |
| 216 scoped_refptr<base::SingleThreadTaskRunner> writer_task_runner_; |
| 217 base::Closure on_reader_detached_; |
| 218 // We need this boolean variable to remember if |on_reader_detached_| is |
| 219 // callable because we need to reset |on_reader_detached_| only on the writer |
| 220 // thread and hence |on_reader_detached_.is_null()| is untrustworthy on |
| 221 // other threads. |
| 222 bool is_on_reader_detached_valid_; |
| 180 bool is_handle_active_; | 223 bool is_handle_active_; |
| 181 bool is_two_phase_read_in_progress_; | 224 bool is_two_phase_read_in_progress_; |
| 182 | 225 |
| 183 DISALLOW_COPY_AND_ASSIGN(Context); | 226 DISALLOW_COPY_AND_ASSIGN(Context); |
| 184 }; | 227 }; |
| 185 | 228 |
| 186 SharedMemoryDataConsumerHandle::Writer::Writer( | 229 SharedMemoryDataConsumerHandle::Writer::Writer( |
| 187 const scoped_refptr<Context>& context, | 230 const scoped_refptr<Context>& context, |
| 188 BackpressureMode mode) | 231 BackpressureMode mode) |
| 189 : context_(context), mode_(mode) { | 232 : context_(context), mode_(mode) { |
| 190 } | 233 } |
| 191 | 234 |
| 192 SharedMemoryDataConsumerHandle::Writer::~Writer() { | 235 SharedMemoryDataConsumerHandle::Writer::~Writer() { |
| 193 Close(); | 236 Close(); |
| 237 base::AutoLock lock(context_->lock()); |
| 238 context_->ResetOnReaderDetached(); |
| 194 } | 239 } |
| 195 | 240 |
| 196 void SharedMemoryDataConsumerHandle::Writer::AddData( | 241 void SharedMemoryDataConsumerHandle::Writer::AddData( |
| 197 scoped_ptr<RequestPeer::ReceivedData> data) { | 242 scoped_ptr<RequestPeer::ReceivedData> data) { |
| 198 if (!data->length()) { | 243 if (!data->length()) { |
| 199 // We omit empty data. | 244 // We omit empty data. |
| 200 return; | 245 return; |
| 201 } | 246 } |
| 202 | 247 |
| 203 bool needs_notification = false; | 248 bool needs_notification = false; |
| (...skipping 23 matching lines...) Expand all Loading... |
| 227 } | 272 } |
| 228 } | 273 } |
| 229 | 274 |
| 230 void SharedMemoryDataConsumerHandle::Writer::Close() { | 275 void SharedMemoryDataConsumerHandle::Writer::Close() { |
| 231 bool needs_notification = false; | 276 bool needs_notification = false; |
| 232 | 277 |
| 233 { | 278 { |
| 234 base::AutoLock lock(context_->lock()); | 279 base::AutoLock lock(context_->lock()); |
| 235 if (context_->result() == Ok) { | 280 if (context_->result() == Ok) { |
| 236 context_->set_result(Done); | 281 context_->set_result(Done); |
| 282 context_->ResetOnReaderDetached(); |
| 237 needs_notification = context_->IsEmpty(); | 283 needs_notification = context_->IsEmpty(); |
| 238 } | 284 } |
| 239 } | 285 } |
| 240 if (needs_notification) { | 286 if (needs_notification) { |
| 241 // We cannot issue the notification synchronously because this function can | 287 // We cannot issue the notification synchronously because this function can |
| 242 // be called in the client's callback. | 288 // be called in the client's callback. |
| 243 context_->PostNotify(); | 289 context_->PostNotify(); |
| 244 } | 290 } |
| 245 } | 291 } |
| 246 | 292 |
| 247 void SharedMemoryDataConsumerHandle::Writer::Fail() { | 293 void SharedMemoryDataConsumerHandle::Writer::Fail() { |
| 248 bool needs_notification = false; | 294 bool needs_notification = false; |
| 249 { | 295 { |
| 250 base::AutoLock lock(context_->lock()); | 296 base::AutoLock lock(context_->lock()); |
| 251 if (context_->result() == Ok) { | 297 if (context_->result() == Ok) { |
| 252 // TODO(yhirano): Use an appropriate error code other than | 298 // TODO(yhirano): Use an appropriate error code other than |
| 253 // UnexpectedError. | 299 // UnexpectedError. |
| 254 context_->set_result(UnexpectedError); | 300 context_->set_result(UnexpectedError); |
| 255 | 301 |
| 256 if (context_->is_two_phase_read_in_progress()) { | 302 if (context_->is_two_phase_read_in_progress()) { |
| 257 // If we are in two-phase read session, we cannot discard the data. We | 303 // If we are in two-phase read session, we cannot discard the data. We |
| 258 // will clear the queue at the end of the session. | 304 // will clear the queue at the end of the session. |
| 259 } else { | 305 } else { |
| 260 context_->ClearQueue(); | 306 context_->ClearQueue(); |
| 261 } | 307 } |
| 262 | 308 |
| 309 context_->ResetOnReaderDetached(); |
| 263 needs_notification = true; | 310 needs_notification = true; |
| 264 } | 311 } |
| 265 } | 312 } |
| 266 if (needs_notification) { | 313 if (needs_notification) { |
| 267 // We cannot issue the notification synchronously because this function can | 314 // We cannot issue the notification synchronously because this function can |
| 268 // be called in the client's callback. | 315 // be called in the client's callback. |
| 269 context_->PostNotify(); | 316 context_->PostNotify(); |
| 270 } | 317 } |
| 271 } | 318 } |
| 272 | 319 |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 355 } else { | 402 } else { |
| 356 context_->Consume(read_size); | 403 context_->Consume(read_size); |
| 357 } | 404 } |
| 358 | 405 |
| 359 return Ok; | 406 return Ok; |
| 360 } | 407 } |
| 361 | 408 |
| 362 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( | 409 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| 363 BackpressureMode mode, | 410 BackpressureMode mode, |
| 364 scoped_ptr<Writer>* writer) | 411 scoped_ptr<Writer>* writer) |
| 365 : context_(new Context) { | 412 : SharedMemoryDataConsumerHandle(mode, base::Closure(), writer) { |
| 413 } |
| 414 |
| 415 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| 416 BackpressureMode mode, |
| 417 const base::Closure& on_reader_detached, |
| 418 scoped_ptr<Writer>* writer) |
| 419 : context_(new Context(on_reader_detached)) { |
| 366 writer->reset(new Writer(context_, mode)); | 420 writer->reset(new Writer(context_, mode)); |
| 367 } | 421 } |
| 368 | 422 |
| 369 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { | 423 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { |
| 370 base::AutoLock lock(context_->lock()); | 424 base::AutoLock lock(context_->lock()); |
| 371 context_->set_is_handle_active(false); | 425 context_->set_is_handle_active(false); |
| 372 context_->ClearIfNecessary(); | 426 context_->ClearIfNecessary(); |
| 373 } | 427 } |
| 374 | 428 |
| 375 scoped_ptr<blink::WebDataConsumerHandle::Reader> | 429 scoped_ptr<blink::WebDataConsumerHandle::Reader> |
| 376 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) { | 430 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) { |
| 377 return make_scoped_ptr(obtainReaderInternal(client)); | 431 return make_scoped_ptr(obtainReaderInternal(client)); |
| 378 } | 432 } |
| 379 | 433 |
| 380 SharedMemoryDataConsumerHandle::ReaderImpl* | 434 SharedMemoryDataConsumerHandle::ReaderImpl* |
| 381 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) { | 435 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) { |
| 382 return new ReaderImpl(context_, client); | 436 return new ReaderImpl(context_, client); |
| 383 } | 437 } |
| 384 | 438 |
| 385 const char* SharedMemoryDataConsumerHandle::debugName() const { | 439 const char* SharedMemoryDataConsumerHandle::debugName() const { |
| 386 return "SharedMemoryDataConsumerHandle"; | 440 return "SharedMemoryDataConsumerHandle"; |
| 387 } | 441 } |
| 388 | 442 |
| 389 } // namespace content | 443 } // namespace content |
| OLD | NEW |