| Index: content/child/shared_memory_data_consumer_handle.cc
|
| diff --git a/content/child/shared_memory_data_consumer_handle.cc b/content/child/shared_memory_data_consumer_handle.cc
|
| index f5d6e69f5654f0ed6bc13c0519ae3f9c830cfe07..24db0590a39ba9ab05ae041478950666de398a2a 100644
|
| --- a/content/child/shared_memory_data_consumer_handle.cc
|
| +++ b/content/child/shared_memory_data_consumer_handle.cc
|
| @@ -6,7 +6,6 @@
|
|
|
| #include <algorithm>
|
| #include <deque>
|
| -#include <vector>
|
|
|
| #include "base/bind.h"
|
| #include "base/message_loop/message_loop.h"
|
| @@ -54,29 +53,15 @@ class SharedMemoryDataConsumerHandle::Context final
|
| : result_(Ok),
|
| first_offset_(0),
|
| client_(nullptr),
|
| - is_reader_active_(true) {}
|
| + is_handle_active_(true) {}
|
|
|
| bool IsEmpty() const { return queue_.empty(); }
|
| - void Clear() {
|
| - for (auto& data : queue_) {
|
| - delete data;
|
| + void ClearIfNecessary() {
|
| + if (!is_handle_locked() && !is_handle_active()) {
|
| + // No one is interested in the contents.
|
| + Clear();
|
| }
|
| - queue_.clear();
|
| - first_offset_ = 0;
|
| - client_ = nullptr;
|
| }
|
| - void Notify() {
|
| - // Note that this function is not protected by |lock_| (actually it
|
| - // shouldn't be) but |notification_task_runner_| is thread-safe.
|
| -
|
| - if (notification_task_runner_->BelongsToCurrentThread()) {
|
| - NotifyImmediately();
|
| - } else {
|
| - notification_task_runner_->PostTask(
|
| - FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
|
| - }
|
| - }
|
| -
|
| RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
|
| void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
|
| queue_.push_back(data.release());
|
| @@ -84,18 +69,31 @@ class SharedMemoryDataConsumerHandle::Context final
|
| size_t first_offset() const { return first_offset_; }
|
| Result result() const { return result_; }
|
| void set_result(Result r) { result_ = r; }
|
| - Client* client() { return client_; }
|
| - void SetClient(Client* client) {
|
| - if (client) {
|
| - notification_task_runner_ = base::MessageLoop::current()->task_runner();
|
| - client_ = client;
|
| - } else {
|
| - notification_task_runner_ = nullptr;
|
| - client_ = nullptr;
|
| + void AcquireReaderLock(Client* client) {
|
| + DCHECK(!notification_task_runner_);
|
| + DCHECK(!client_);
|
| + notification_task_runner_ = base::MessageLoop::current()->task_runner();
|
| + client_ = client;
|
| + if (client && !(IsEmpty() && result() == Ok)) {
|
| + // We cannot notify synchronously because the user doesn't have the reader
|
| + // yet.
|
| + notification_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
|
| }
|
| }
|
| - bool is_reader_active() const { return is_reader_active_; }
|
| - void set_is_reader_active(bool b) { is_reader_active_ = b; }
|
| + void ReleaseReaderLock() {
|
| + DCHECK(notification_task_runner_);
|
| + notification_task_runner_ = nullptr;
|
| + client_ = nullptr;
|
| + }
|
| + void Notify() { NotifyInternal(true); }
|
| + bool is_handle_locked() const { return notification_task_runner_; }
|
| + bool IsReaderBoundToCurrentThread() const {
|
| + return notification_task_runner_ &&
|
| + notification_task_runner_->BelongsToCurrentThread();
|
| + }
|
| + bool is_handle_active() const { return is_handle_active_; }
|
| + void set_is_handle_active(bool b) { is_handle_active_ = b; }
|
| void Consume(size_t s) {
|
| first_offset_ += s;
|
| auto top = Top();
|
| @@ -108,19 +106,43 @@ class SharedMemoryDataConsumerHandle::Context final
|
| base::Lock& lock() { return lock_; }
|
|
|
| private:
|
| + void NotifyInternal(bool repost) {
|
| + // Note that this function is not protected by |lock_|.
|
| +
|
| + auto runner = notification_task_runner_;
|
| + if (!runner)
|
| + return;
|
| +
|
| + if (runner->BelongsToCurrentThread()) {
|
| + // It is safe to access member variables without lock because |client_|
|
| + // is bound to the current thread.
|
| + if (client_)
|
| + client_->didGetReadable();
|
| + return;
|
| + }
|
| + if (repost) {
|
| + // We don't re-post the task when the runner changes while waiting for
|
| + // this task because in this case a new reader is obtained and
|
| + // notification is already done at the reader creation time if necessary.
|
| + runner->PostTask(FROM_HERE,
|
| + base::Bind(&Context::NotifyInternal, this, false));
|
| + }
|
| + }
|
| + void Clear() {
|
| + for (auto& data : queue_) {
|
| + delete data;
|
| + }
|
| + queue_.clear();
|
| + first_offset_ = 0;
|
| + client_ = nullptr;
|
| + }
|
| +
|
| friend class base::RefCountedThreadSafe<Context>;
|
| ~Context() {
|
| // This is necessary because the queue stores raw pointers.
|
| Clear();
|
| }
|
|
|
| - void NotifyImmediately() {
|
| - // As we can assume that all reader-side methods are called on this
|
| - // thread (see WebDataConsumerHandle comments), we don't need to lock.
|
| - if (client_)
|
| - client_->didGetReadable();
|
| - }
|
| -
|
| base::Lock lock_;
|
| // |result_| stores the ultimate state of this handle if it has. Otherwise,
|
| // |Ok| is set.
|
| @@ -131,7 +153,7 @@ class SharedMemoryDataConsumerHandle::Context final
|
| size_t first_offset_;
|
| Client* client_;
|
| scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
|
| - bool is_reader_active_;
|
| + bool is_handle_active_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Context);
|
| };
|
| @@ -156,12 +178,12 @@ void SharedMemoryDataConsumerHandle::Writer::AddData(
|
| bool needs_notification = false;
|
| {
|
| base::AutoLock lock(context_->lock());
|
| - if (!context_->is_reader_active()) {
|
| + if (!context_->is_handle_active() && !context_->is_handle_locked()) {
|
| // No one is interested in the data.
|
| return;
|
| }
|
|
|
| - needs_notification = context_->client() && context_->IsEmpty();
|
| + needs_notification = context_->IsEmpty();
|
| scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
|
| if (mode_ == kApplyBackpressure) {
|
| data_to_pass =
|
| @@ -183,30 +205,33 @@ void SharedMemoryDataConsumerHandle::Writer::Close() {
|
| base::AutoLock lock(context_->lock());
|
| if (context_->result() == Ok) {
|
| context_->set_result(Done);
|
| - needs_notification = context_->client() && context_->IsEmpty();
|
| + needs_notification = context_->IsEmpty();
|
| }
|
| }
|
| if (needs_notification)
|
| context_->Notify();
|
| }
|
|
|
| -SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
|
| - BackpressureMode mode,
|
| - scoped_ptr<Writer>* writer)
|
| - : context_(new Context) {
|
| - writer->reset(new Writer(context_, mode));
|
| +SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
|
| + scoped_refptr<Context> context,
|
| + Client* client)
|
| + : context_(context) {
|
| + base::AutoLock lock(context_->lock());
|
| + DCHECK(!context_->is_handle_locked());
|
| + context_->AcquireReaderLock(client);
|
| }
|
|
|
| -SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
|
| +SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
|
| base::AutoLock lock(context_->lock());
|
| - context_->set_is_reader_active(false);
|
| - context_->Clear();
|
| + context_->ReleaseReaderLock();
|
| + context_->ClearIfNecessary();
|
| }
|
|
|
| -Result SharedMemoryDataConsumerHandle::read(void* data,
|
| - size_t size,
|
| - Flags flags,
|
| - size_t* read_size_to_return) {
|
| +Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
|
| + void* data,
|
| + size_t size,
|
| + Flags flags,
|
| + size_t* read_size_to_return) {
|
| base::AutoLock lock(context_->lock());
|
|
|
| size_t total_read_size = 0;
|
| @@ -229,9 +254,10 @@ Result SharedMemoryDataConsumerHandle::read(void* data,
|
| return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
|
| }
|
|
|
| -Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
|
| - Flags flags,
|
| - size_t* available) {
|
| +Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
|
| + const void** buffer,
|
| + Flags flags,
|
| + size_t* available) {
|
| *buffer = nullptr;
|
| *available = 0;
|
|
|
| @@ -250,7 +276,7 @@ Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
|
| return Ok;
|
| }
|
|
|
| -Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
|
| +Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
|
| base::AutoLock lock(context_->lock());
|
|
|
| if (context_->IsEmpty())
|
| @@ -260,22 +286,83 @@ Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
|
| return Ok;
|
| }
|
|
|
| +SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
|
| + BackpressureMode mode,
|
| + scoped_ptr<Writer>* writer)
|
| + : context_(new Context) {
|
| + writer->reset(new Writer(context_, mode));
|
| +}
|
| +
|
| +SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
|
| + base::AutoLock lock(context_->lock());
|
| + context_->set_is_handle_active(false);
|
| + context_->ClearIfNecessary();
|
| +}
|
| +
|
| +scoped_ptr<blink::WebDataConsumerHandle::Reader>
|
| +SharedMemoryDataConsumerHandle::ObtainReader(Client* client) {
|
| + return make_scoped_ptr(obtainReaderInternal(client));
|
| +}
|
| +
|
| +SharedMemoryDataConsumerHandle::ReaderImpl*
|
| +SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
|
| + return new ReaderImpl(context_, client);
|
| +}
|
| +
|
| +Result SharedMemoryDataConsumerHandle::read(void* data,
|
| + size_t size,
|
| + Flags flags,
|
| + size_t* read_size_to_return) {
|
| + // Note this (and below similar functions) is a bit racy. We don't care about
|
| + // it because this is a deprecated function and will be removed shortly.
|
| + LockImplicitly();
|
| + return reader_->read(data, size, flags, read_size_to_return);
|
| +}
|
| +
|
| +Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
|
| + Flags flags,
|
| + size_t* available) {
|
| + LockImplicitly();
|
| + return reader_->beginRead(buffer, flags, available);
|
| +}
|
| +
|
| +Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
|
| + LockImplicitly();
|
| + return reader_->endRead(read_size);
|
| +}
|
| +
|
| void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
|
| - bool needs_notification = false;
|
| + UnlockImplicitly();
|
| + reader_ = ObtainReader(client);
|
| +}
|
| +
|
| +void SharedMemoryDataConsumerHandle::unregisterClient() {
|
| + reader_.reset();
|
| +}
|
| +
|
| +void SharedMemoryDataConsumerHandle::LockImplicitly() {
|
| {
|
| base::AutoLock lock(context_->lock());
|
| -
|
| - context_->SetClient(client);
|
| - needs_notification = !context_->IsEmpty();
|
| + if (reader_) {
|
| + DCHECK(context_->IsReaderBoundToCurrentThread());
|
| + return;
|
| + }
|
| }
|
| - if (needs_notification)
|
| - context_->Notify();
|
| + reader_ = ObtainReader(nullptr);
|
| }
|
|
|
| -void SharedMemoryDataConsumerHandle::unregisterClient() {
|
| - base::AutoLock lock(context_->lock());
|
| -
|
| - context_->SetClient(nullptr);
|
| +void SharedMemoryDataConsumerHandle::UnlockImplicitly() {
|
| + bool needs_unlock = false;
|
| + {
|
| + base::AutoLock lock(context_->lock());
|
| + if (reader_) {
|
| + DCHECK(context_->IsReaderBoundToCurrentThread());
|
| + needs_unlock = true;
|
| + }
|
| + }
|
| + if (needs_unlock) {
|
| + reader_.reset();
|
| + }
|
| }
|
|
|
| } // namespace content
|
|
|