Chromium Code Reviews| Index: content/child/shared_memory_data_consumer_handle.cc |
| diff --git a/content/child/shared_memory_data_consumer_handle.cc b/content/child/shared_memory_data_consumer_handle.cc |
| index f5d6e69f5654f0ed6bc13c0519ae3f9c830cfe07..c6ea3e60fb3724a1d4371f78a929625909cf55fd 100644 |
| --- a/content/child/shared_memory_data_consumer_handle.cc |
| +++ b/content/child/shared_memory_data_consumer_handle.cc |
| @@ -6,7 +6,6 @@ |
| #include <algorithm> |
| #include <deque> |
| -#include <vector> |
| #include "base/bind.h" |
| #include "base/message_loop/message_loop.h" |
| @@ -54,29 +53,15 @@ class SharedMemoryDataConsumerHandle::Context final |
| : result_(Ok), |
| first_offset_(0), |
| client_(nullptr), |
| - is_reader_active_(true) {} |
| + is_handle_active_(true) {} |
| bool IsEmpty() const { return queue_.empty(); } |
| - void Clear() { |
| - for (auto& data : queue_) { |
| - delete data; |
| - } |
| - queue_.clear(); |
| - first_offset_ = 0; |
| - client_ = nullptr; |
| - } |
| - void Notify() { |
| - // Note that this function is not protected by |lock_| (actually it |
| - // shouldn't be) but |notification_task_runner_| is thread-safe. |
| - |
| - if (notification_task_runner_->BelongsToCurrentThread()) { |
| - NotifyImmediately(); |
| - } else { |
| - notification_task_runner_->PostTask( |
| - FROM_HERE, base::Bind(&Context::NotifyImmediately, this)); |
| + void ClearIfNecessary() { |
| + if (!is_handle_locked() && !is_handle_active()) { |
| + // No one is interested in the contents. |
| + Clear(); |
| } |
| } |
| - |
| RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } |
| void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { |
| queue_.push_back(data.release()); |
| @@ -84,18 +69,31 @@ class SharedMemoryDataConsumerHandle::Context final |
| size_t first_offset() const { return first_offset_; } |
| Result result() const { return result_; } |
| void set_result(Result r) { result_ = r; } |
| - Client* client() { return client_; } |
| - void SetClient(Client* client) { |
| - if (client) { |
| - notification_task_runner_ = base::MessageLoop::current()->task_runner(); |
| - client_ = client; |
| - } else { |
| - notification_task_runner_ = nullptr; |
| - client_ = nullptr; |
| + void AcquireReaderLock(Client* client) { |
| + DCHECK(!notification_task_runner_); |
| + DCHECK(!client_); |
| + notification_task_runner_ = base::MessageLoop::current()->task_runner(); |
| + client_ = client; |
| + if (client && !(IsEmpty() && result() == Ok)) { |
| + // We cannot notify synchronously because the user doesn't have the reader |
| + // yet. |
| + notification_task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&Context::NotifyInternal, this, false)); |
| } |
| } |
| - bool is_reader_active() const { return is_reader_active_; } |
| - void set_is_reader_active(bool b) { is_reader_active_ = b; } |
| + void ReleaseReaderLock() { |
| + DCHECK(notification_task_runner_); |
| + notification_task_runner_ = nullptr; |
| + client_ = nullptr; |
| + } |
| + void Notify() { NotifyInternal(true); } |
| + bool is_handle_locked() const { return notification_task_runner_; } |
| + bool IsReaderBoundToCurrentThread() const { |
| + return notification_task_runner_ && |
| + notification_task_runner_->BelongsToCurrentThread(); |
| + } |
| + bool is_handle_active() const { return is_handle_active_; } |
| + void set_is_handle_active(bool b) { is_handle_active_ = b; } |
| void Consume(size_t s) { |
| first_offset_ += s; |
| auto top = Top(); |
| @@ -108,19 +106,39 @@ class SharedMemoryDataConsumerHandle::Context final |
| base::Lock& lock() { return lock_; } |
| private: |
| + void NotifyInternal(bool repost) { |
| + // Note that this function is not protected by |lock_|. |
| + |
| + auto runner = notification_task_runner_; |
| + if (!runner) { |
| + // Do nothing. |
| + } else if (runner->BelongsToCurrentThread()) { |
| + // It is safe to access member variables without lock because |client_| |
| + // is bound to the current thread. |
| + if (client_) |
| + client_->didGetReadable(); |
| + } else if (repost) { |
| + // We don't re-post the task when the runner changes while waiting for |
| + // this task. |
|
hiroshige
2015/06/12 10:12:09
Please describe why we don't repost if |repost| is
yhirano
2015/06/15 10:23:22
Done.
|
| + runner->PostTask(FROM_HERE, |
| + base::Bind(&Context::NotifyInternal, this, false)); |
| + } |
| + } |
| + void Clear() { |
| + for (auto& data : queue_) { |
| + delete data; |
| + } |
| + queue_.clear(); |
| + first_offset_ = 0; |
| + client_ = nullptr; |
| + } |
| + |
| friend class base::RefCountedThreadSafe<Context>; |
| ~Context() { |
| // This is necessary because the queue stores raw pointers. |
| Clear(); |
| } |
| - void NotifyImmediately() { |
| - // As we can assume that all reader-side methods are called on this |
| - // thread (see WebDataConsumerHandle comments), we don't need to lock. |
| - if (client_) |
| - client_->didGetReadable(); |
| - } |
| - |
| base::Lock lock_; |
| // |result_| stores the ultimate state of this handle if it has. Otherwise, |
| // |Ok| is set. |
| @@ -131,7 +149,7 @@ class SharedMemoryDataConsumerHandle::Context final |
| size_t first_offset_; |
| Client* client_; |
| scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; |
| - bool is_reader_active_; |
| + bool is_handle_active_; |
| DISALLOW_COPY_AND_ASSIGN(Context); |
| }; |
| @@ -156,12 +174,12 @@ void SharedMemoryDataConsumerHandle::Writer::AddData( |
| bool needs_notification = false; |
| { |
| base::AutoLock lock(context_->lock()); |
| - if (!context_->is_reader_active()) { |
| + if (!context_->is_handle_active() && !context_->is_handle_locked()) { |
| // No one is interested in the data. |
| return; |
| } |
| - needs_notification = context_->client() && context_->IsEmpty(); |
| + needs_notification = context_->IsEmpty(); |
| scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass; |
| if (mode_ == kApplyBackpressure) { |
| data_to_pass = |
| @@ -183,30 +201,33 @@ void SharedMemoryDataConsumerHandle::Writer::Close() { |
| base::AutoLock lock(context_->lock()); |
| if (context_->result() == Ok) { |
| context_->set_result(Done); |
| - needs_notification = context_->client() && context_->IsEmpty(); |
| + needs_notification = context_->IsEmpty(); |
| } |
| } |
| if (needs_notification) |
| context_->Notify(); |
| } |
| -SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| - BackpressureMode mode, |
| - scoped_ptr<Writer>* writer) |
| - : context_(new Context) { |
| - writer->reset(new Writer(context_, mode)); |
| +SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl( |
| + scoped_refptr<Context> context, |
| + Client* client) |
| + : context_(context) { |
| + base::AutoLock lock(context_->lock()); |
| + DCHECK(!context_->is_handle_locked()); |
| + context_->AcquireReaderLock(client); |
| } |
| -SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { |
| +SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() { |
| base::AutoLock lock(context_->lock()); |
| - context_->set_is_reader_active(false); |
| - context_->Clear(); |
| + context_->ReleaseReaderLock(); |
| + context_->ClearIfNecessary(); |
| } |
| -Result SharedMemoryDataConsumerHandle::read(void* data, |
| - size_t size, |
| - Flags flags, |
| - size_t* read_size_to_return) { |
| +Result SharedMemoryDataConsumerHandle::ReaderImpl::read( |
| + void* data, |
| + size_t size, |
| + Flags flags, |
| + size_t* read_size_to_return) { |
| base::AutoLock lock(context_->lock()); |
| size_t total_read_size = 0; |
| @@ -229,9 +250,10 @@ Result SharedMemoryDataConsumerHandle::read(void* data, |
| return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; |
| } |
| -Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, |
| - Flags flags, |
| - size_t* available) { |
| +Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead( |
| + const void** buffer, |
| + Flags flags, |
| + size_t* available) { |
| *buffer = nullptr; |
| *available = 0; |
| @@ -250,7 +272,7 @@ Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, |
| return Ok; |
| } |
| -Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { |
| +Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) { |
| base::AutoLock lock(context_->lock()); |
| if (context_->IsEmpty()) |
| @@ -260,22 +282,83 @@ Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { |
| return Ok; |
| } |
| +SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( |
| + BackpressureMode mode, |
| + scoped_ptr<Writer>* writer) |
| + : context_(new Context) { |
| + writer->reset(new Writer(context_, mode)); |
| +} |
| + |
| +SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { |
| + base::AutoLock lock(context_->lock()); |
| + context_->set_is_handle_active(false); |
| + context_->ClearIfNecessary(); |
| +} |
| + |
| +scoped_ptr<SharedMemoryDataConsumerHandle::ReaderImpl> |
| +SharedMemoryDataConsumerHandle::obtainReaderImpl(Client* client) { |
| + return make_scoped_ptr(obtainReaderInternal(client)); |
| +} |
| + |
| +SharedMemoryDataConsumerHandle::ReaderImpl* |
| +SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) { |
| + return new ReaderImpl(context_, client); |
| +} |
| + |
| +Result SharedMemoryDataConsumerHandle::read(void* data, |
| + size_t size, |
| + Flags flags, |
| + size_t* read_size_to_return) { |
| + // Note this (and below similar functions) is a bit racy. We don't care about |
| + // it because this is a deprecated function and will be removed shortly. |
| + LockImplicitly(); |
| + return reader_->read(data, size, flags, read_size_to_return); |
| +} |
| + |
| +Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, |
| + Flags flags, |
| + size_t* available) { |
| + LockImplicitly(); |
| + return reader_->beginRead(buffer, flags, available); |
| +} |
| + |
| +Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { |
| + LockImplicitly(); |
| + return reader_->endRead(read_size); |
| +} |
| + |
| void SharedMemoryDataConsumerHandle::registerClient(Client* client) { |
| - bool needs_notification = false; |
| + UnlockImplicitly(); |
| + reader_ = obtainReaderImpl(client); |
| +} |
| + |
| +void SharedMemoryDataConsumerHandle::unregisterClient() { |
| + reader_.reset(); |
| +} |
| + |
| +void SharedMemoryDataConsumerHandle::LockImplicitly() { |
| { |
| base::AutoLock lock(context_->lock()); |
| - |
| - context_->SetClient(client); |
| - needs_notification = !context_->IsEmpty(); |
| + if (reader_) { |
| + DCHECK(context_->IsReaderBoundToCurrentThread()); |
| + return; |
| + } |
| } |
| - if (needs_notification) |
| - context_->Notify(); |
| + reader_ = obtainReaderImpl(nullptr); |
| } |
| -void SharedMemoryDataConsumerHandle::unregisterClient() { |
| - base::AutoLock lock(context_->lock()); |
| - |
| - context_->SetClient(nullptr); |
| +void SharedMemoryDataConsumerHandle::UnlockImplicitly() { |
| + bool needs_unlock = false; |
| + { |
| + base::AutoLock lock(context_->lock()); |
| + if (reader_) { |
| + DCHECK(context_->IsReaderBoundToCurrentThread()); |
| + needs_unlock = true; |
| + } |
| + } |
| + if (needs_unlock) { |
| + reader_.reset(); |
| + } |
| } |
| } // namespace content |