Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1205)

Unified Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1164493008: Implement WebDataConsumerHandle::Reader. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/child/shared_memory_data_consumer_handle.cc
diff --git a/content/child/shared_memory_data_consumer_handle.cc b/content/child/shared_memory_data_consumer_handle.cc
index f5d6e69f5654f0ed6bc13c0519ae3f9c830cfe07..24db0590a39ba9ab05ae041478950666de398a2a 100644
--- a/content/child/shared_memory_data_consumer_handle.cc
+++ b/content/child/shared_memory_data_consumer_handle.cc
@@ -6,7 +6,6 @@
#include <algorithm>
#include <deque>
-#include <vector>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
@@ -54,29 +53,15 @@ class SharedMemoryDataConsumerHandle::Context final
: result_(Ok),
first_offset_(0),
client_(nullptr),
- is_reader_active_(true) {}
+ is_handle_active_(true) {}
bool IsEmpty() const { return queue_.empty(); }
- void Clear() {
- for (auto& data : queue_) {
- delete data;
+ void ClearIfNecessary() {
+ if (!is_handle_locked() && !is_handle_active()) {
+ // No one is interested in the contents.
+ Clear();
}
- queue_.clear();
- first_offset_ = 0;
- client_ = nullptr;
}
- void Notify() {
- // Note that this function is not protected by |lock_| (actually it
- // shouldn't be) but |notification_task_runner_| is thread-safe.
-
- if (notification_task_runner_->BelongsToCurrentThread()) {
- NotifyImmediately();
- } else {
- notification_task_runner_->PostTask(
- FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
- }
- }
-
RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
queue_.push_back(data.release());
@@ -84,18 +69,31 @@ class SharedMemoryDataConsumerHandle::Context final
size_t first_offset() const { return first_offset_; }
Result result() const { return result_; }
void set_result(Result r) { result_ = r; }
- Client* client() { return client_; }
- void SetClient(Client* client) {
- if (client) {
- notification_task_runner_ = base::MessageLoop::current()->task_runner();
- client_ = client;
- } else {
- notification_task_runner_ = nullptr;
- client_ = nullptr;
+ void AcquireReaderLock(Client* client) {
+ DCHECK(!notification_task_runner_);
+ DCHECK(!client_);
+ notification_task_runner_ = base::MessageLoop::current()->task_runner();
+ client_ = client;
+ if (client && !(IsEmpty() && result() == Ok)) {
+ // We cannot notify synchronously because the user doesn't have the reader
+ // yet.
+ notification_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
}
}
- bool is_reader_active() const { return is_reader_active_; }
- void set_is_reader_active(bool b) { is_reader_active_ = b; }
+ void ReleaseReaderLock() {
+ DCHECK(notification_task_runner_);
+ notification_task_runner_ = nullptr;
+ client_ = nullptr;
+ }
+ void Notify() { NotifyInternal(true); }
+ bool is_handle_locked() const { return notification_task_runner_; }
+ bool IsReaderBoundToCurrentThread() const {
+ return notification_task_runner_ &&
+ notification_task_runner_->BelongsToCurrentThread();
+ }
+ bool is_handle_active() const { return is_handle_active_; }
+ void set_is_handle_active(bool b) { is_handle_active_ = b; }
void Consume(size_t s) {
first_offset_ += s;
auto top = Top();
@@ -108,19 +106,43 @@ class SharedMemoryDataConsumerHandle::Context final
base::Lock& lock() { return lock_; }
private:
+ void NotifyInternal(bool repost) {
+ // Note that this function is not protected by |lock_|.
+
+ auto runner = notification_task_runner_;
+ if (!runner)
+ return;
+
+ if (runner->BelongsToCurrentThread()) {
+ // It is safe to access member variables without lock because |client_|
+ // is bound to the current thread.
+ if (client_)
+ client_->didGetReadable();
+ return;
+ }
+ if (repost) {
+ // We don't re-post the task when the runner changes while waiting for
+ // this task because in this case a new reader is obtained and
+ // notification is already done at the reader creation time if necessary.
+ runner->PostTask(FROM_HERE,
+ base::Bind(&Context::NotifyInternal, this, false));
+ }
+ }
+ void Clear() {
+ for (auto& data : queue_) {
+ delete data;
+ }
+ queue_.clear();
+ first_offset_ = 0;
+ client_ = nullptr;
+ }
+
friend class base::RefCountedThreadSafe<Context>;
~Context() {
// This is necessary because the queue stores raw pointers.
Clear();
}
- void NotifyImmediately() {
- // As we can assume that all reader-side methods are called on this
- // thread (see WebDataConsumerHandle comments), we don't need to lock.
- if (client_)
- client_->didGetReadable();
- }
-
base::Lock lock_;
// |result_| stores the ultimate state of this handle if it has. Otherwise,
// |Ok| is set.
@@ -131,7 +153,7 @@ class SharedMemoryDataConsumerHandle::Context final
size_t first_offset_;
Client* client_;
scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
- bool is_reader_active_;
+ bool is_handle_active_;
DISALLOW_COPY_AND_ASSIGN(Context);
};
@@ -156,12 +178,12 @@ void SharedMemoryDataConsumerHandle::Writer::AddData(
bool needs_notification = false;
{
base::AutoLock lock(context_->lock());
- if (!context_->is_reader_active()) {
+ if (!context_->is_handle_active() && !context_->is_handle_locked()) {
// No one is interested in the data.
return;
}
- needs_notification = context_->client() && context_->IsEmpty();
+ needs_notification = context_->IsEmpty();
scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
if (mode_ == kApplyBackpressure) {
data_to_pass =
@@ -183,30 +205,33 @@ void SharedMemoryDataConsumerHandle::Writer::Close() {
base::AutoLock lock(context_->lock());
if (context_->result() == Ok) {
context_->set_result(Done);
- needs_notification = context_->client() && context_->IsEmpty();
+ needs_notification = context_->IsEmpty();
}
}
if (needs_notification)
context_->Notify();
}
-SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
- BackpressureMode mode,
- scoped_ptr<Writer>* writer)
- : context_(new Context) {
- writer->reset(new Writer(context_, mode));
+SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
+ scoped_refptr<Context> context,
+ Client* client)
+ : context_(context) {
+ base::AutoLock lock(context_->lock());
+ DCHECK(!context_->is_handle_locked());
+ context_->AcquireReaderLock(client);
}
-SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
+SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
base::AutoLock lock(context_->lock());
- context_->set_is_reader_active(false);
- context_->Clear();
+ context_->ReleaseReaderLock();
+ context_->ClearIfNecessary();
}
-Result SharedMemoryDataConsumerHandle::read(void* data,
- size_t size,
- Flags flags,
- size_t* read_size_to_return) {
+Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
+ void* data,
+ size_t size,
+ Flags flags,
+ size_t* read_size_to_return) {
base::AutoLock lock(context_->lock());
size_t total_read_size = 0;
@@ -229,9 +254,10 @@ Result SharedMemoryDataConsumerHandle::read(void* data,
return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
}
-Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
- Flags flags,
- size_t* available) {
+Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
+ const void** buffer,
+ Flags flags,
+ size_t* available) {
*buffer = nullptr;
*available = 0;
@@ -250,7 +276,7 @@ Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
return Ok;
}
-Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
+Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
base::AutoLock lock(context_->lock());
if (context_->IsEmpty())
@@ -260,22 +286,83 @@ Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
return Ok;
}
+SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
+ BackpressureMode mode,
+ scoped_ptr<Writer>* writer)
+ : context_(new Context) {
+ writer->reset(new Writer(context_, mode));
+}
+
+SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
+ base::AutoLock lock(context_->lock());
+ context_->set_is_handle_active(false);
+ context_->ClearIfNecessary();
+}
+
+scoped_ptr<blink::WebDataConsumerHandle::Reader>
+SharedMemoryDataConsumerHandle::ObtainReader(Client* client) {
+ return make_scoped_ptr(obtainReaderInternal(client));
+}
+
+SharedMemoryDataConsumerHandle::ReaderImpl*
+SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
+ return new ReaderImpl(context_, client);
+}
+
+Result SharedMemoryDataConsumerHandle::read(void* data,
+ size_t size,
+ Flags flags,
+ size_t* read_size_to_return) {
+ // Note this (and below similar functions) is a bit racy. We don't care about
+ // it because this is a deprecated function and will be removed shortly.
+ LockImplicitly();
+ return reader_->read(data, size, flags, read_size_to_return);
+}
+
+Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
+ Flags flags,
+ size_t* available) {
+ LockImplicitly();
+ return reader_->beginRead(buffer, flags, available);
+}
+
+Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
+ LockImplicitly();
+ return reader_->endRead(read_size);
+}
+
void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
- bool needs_notification = false;
+ UnlockImplicitly();
+ reader_ = ObtainReader(client);
+}
+
+void SharedMemoryDataConsumerHandle::unregisterClient() {
+ reader_.reset();
+}
+
+void SharedMemoryDataConsumerHandle::LockImplicitly() {
{
base::AutoLock lock(context_->lock());
-
- context_->SetClient(client);
- needs_notification = !context_->IsEmpty();
+ if (reader_) {
+ DCHECK(context_->IsReaderBoundToCurrentThread());
+ return;
+ }
}
- if (needs_notification)
- context_->Notify();
+ reader_ = ObtainReader(nullptr);
}
-void SharedMemoryDataConsumerHandle::unregisterClient() {
- base::AutoLock lock(context_->lock());
-
- context_->SetClient(nullptr);
+void SharedMemoryDataConsumerHandle::UnlockImplicitly() {
+ bool needs_unlock = false;
+ {
+ base::AutoLock lock(context_->lock());
+ if (reader_) {
+ DCHECK(context_->IsReaderBoundToCurrentThread());
+ needs_unlock = true;
+ }
+ }
+ if (needs_unlock) {
+ reader_.reset();
+ }
}
} // namespace content
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698