| Index: content/child/shared_memory_data_consumer_handle.cc
|
| diff --git a/content/child/shared_memory_data_consumer_handle.cc b/content/child/shared_memory_data_consumer_handle.cc
|
| index 2dc61df1058643a82c5d3c2c39f6695c0b783fe3..f5d6e69f5654f0ed6bc13c0519ae3f9c830cfe07 100644
|
| --- a/content/child/shared_memory_data_consumer_handle.cc
|
| +++ b/content/child/shared_memory_data_consumer_handle.cc
|
| @@ -8,10 +8,43 @@
|
| #include <deque>
|
| #include <vector>
|
|
|
| +#include "base/bind.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/single_thread_task_runner.h"
|
| +#include "base/synchronization/lock.h"
|
| #include "content/public/child/fixed_received_data.h"
|
|
|
| namespace content {
|
|
|
| +namespace {
|
| +
|
| +class DelegateThreadSafeReceivedData final
|
| + : public RequestPeer::ThreadSafeReceivedData {
|
| + public:
|
| + explicit DelegateThreadSafeReceivedData(
|
| + scoped_ptr<RequestPeer::ReceivedData> data)
|
| + : data_(data.Pass()),
|
| + task_runner_(base::MessageLoop::current()->task_runner()) {}
|
| + ~DelegateThreadSafeReceivedData() override {
|
| + if (!task_runner_->BelongsToCurrentThread()) {
|
| + // Delete the data on the original thread.
|
| + task_runner_->DeleteSoon(FROM_HERE, data_.release());
|
| + }
|
| + }
|
| +
|
| + const char* payload() const override { return data_->payload(); }
|
| + int length() const override { return data_->length(); }
|
| + int encoded_length() const override { return data_->encoded_length(); }
|
| +
|
| + private:
|
| + scoped_ptr<RequestPeer::ReceivedData> data_;
|
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| using Result = blink::WebDataConsumerHandle::Result;
|
|
|
| class SharedMemoryDataConsumerHandle::Context final
|
| @@ -25,33 +58,54 @@ class SharedMemoryDataConsumerHandle::Context final
|
|
|
| bool IsEmpty() const { return queue_.empty(); }
|
| void Clear() {
|
| - for (RequestPeer::ReceivedData* data : queue_) {
|
| + for (auto& data : queue_) {
|
| delete data;
|
| }
|
| queue_.clear();
|
| first_offset_ = 0;
|
| client_ = nullptr;
|
| }
|
| - RequestPeer::ReceivedData* Top() { return queue_.front(); }
|
| - void Push(scoped_ptr<RequestPeer::ReceivedData> data) {
|
| + void Notify() {
|
| + // Note that this function is not protected by |lock_| (actually it
|
| + // shouldn't be) but |notification_task_runner_| is thread-safe.
|
| +
|
| + if (notification_task_runner_->BelongsToCurrentThread()) {
|
| + NotifyImmediately();
|
| + } else {
|
| + notification_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
|
| + }
|
| + }
|
| +
|
| + RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
|
| + void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
|
| queue_.push_back(data.release());
|
| }
|
| size_t first_offset() const { return first_offset_; }
|
| Result result() const { return result_; }
|
| void set_result(Result r) { result_ = r; }
|
| Client* client() { return client_; }
|
| - void set_client(Client* client) { client_ = client; }
|
| + void SetClient(Client* client) {
|
| + if (client) {
|
| + notification_task_runner_ = base::MessageLoop::current()->task_runner();
|
| + client_ = client;
|
| + } else {
|
| + notification_task_runner_ = nullptr;
|
| + client_ = nullptr;
|
| + }
|
| + }
|
| bool is_reader_active() const { return is_reader_active_; }
|
| void set_is_reader_active(bool b) { is_reader_active_ = b; }
|
| void Consume(size_t s) {
|
| first_offset_ += s;
|
| - RequestPeer::ReceivedData* top = Top();
|
| + auto top = Top();
|
| if (static_cast<size_t>(top->length()) <= first_offset_) {
|
| delete top;
|
| queue_.pop_front();
|
| first_offset_ = 0;
|
| }
|
| }
|
| + base::Lock& lock() { return lock_; }
|
|
|
| private:
|
| friend class base::RefCountedThreadSafe<Context>;
|
| @@ -60,13 +114,23 @@ class SharedMemoryDataConsumerHandle::Context final
|
| Clear();
|
| }
|
|
|
| + void NotifyImmediately() {
|
| + // As we can assume that all reader-side methods are called on this
|
| + // thread (see WebDataConsumerHandle comments), we don't need to lock.
|
| + if (client_)
|
| + client_->didGetReadable();
|
| + }
|
| +
|
| + base::Lock lock_;
|
| // |result_| stores the ultimate state of this handle if it has. Otherwise,
|
| // |Ok| is set.
|
| Result result_;
|
| - // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed.
|
| - std::deque<RequestPeer::ReceivedData*> queue_;
|
| + // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
|
| + // once it is allowed.
|
| + std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
|
| size_t first_offset_;
|
| Client* client_;
|
| + scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
|
| bool is_reader_active_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(Context);
|
| @@ -89,30 +153,41 @@ void SharedMemoryDataConsumerHandle::Writer::AddData(
|
| return;
|
| }
|
|
|
| - if (!context_->is_reader_active()) {
|
| - // No one is interested in the data.
|
| - return;
|
| - }
|
| + bool needs_notification = false;
|
| + {
|
| + base::AutoLock lock(context_->lock());
|
| + if (!context_->is_reader_active()) {
|
| + // No one is interested in the data.
|
| + return;
|
| + }
|
|
|
| - bool needs_notification = context_->client() && context_->IsEmpty();
|
| - scoped_ptr<RequestPeer::ReceivedData> data_to_pass;
|
| - if (mode_ == kApplyBackpressure) {
|
| - data_to_pass = data.Pass();
|
| - } else {
|
| - data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
|
| + needs_notification = context_->client() && context_->IsEmpty();
|
| + scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
|
| + if (mode_ == kApplyBackpressure) {
|
| + data_to_pass =
|
| + make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
|
| + } else {
|
| + data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
|
| + }
|
| + context_->Push(data_to_pass.Pass());
|
| }
|
| - context_->Push(data_to_pass.Pass());
|
|
|
| if (needs_notification)
|
| - context_->client()->didGetReadable();
|
| + context_->Notify();
|
| }
|
|
|
| void SharedMemoryDataConsumerHandle::Writer::Close() {
|
| - if (context_->result() == Ok) {
|
| - context_->set_result(Done);
|
| - if (context_->client() && context_->IsEmpty())
|
| - context_->client()->didGetReadable();
|
| + bool needs_notification = false;
|
| +
|
| + {
|
| + base::AutoLock lock(context_->lock());
|
| + if (context_->result() == Ok) {
|
| + context_->set_result(Done);
|
| + needs_notification = context_->client() && context_->IsEmpty();
|
| + }
|
| }
|
| + if (needs_notification)
|
| + context_->Notify();
|
| }
|
|
|
| SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
|
| @@ -123,6 +198,7 @@ SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
|
| }
|
|
|
| SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
|
| + base::AutoLock lock(context_->lock());
|
| context_->set_is_reader_active(false);
|
| context_->Clear();
|
| }
|
| @@ -131,6 +207,8 @@ Result SharedMemoryDataConsumerHandle::read(void* data,
|
| size_t size,
|
| Flags flags,
|
| size_t* read_size_to_return) {
|
| + base::AutoLock lock(context_->lock());
|
| +
|
| size_t total_read_size = 0;
|
| *read_size_to_return = 0;
|
| if (context_->result() != Ok && context_->result() != Done)
|
| @@ -157,6 +235,8 @@ Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
|
| *buffer = nullptr;
|
| *available = 0;
|
|
|
| + base::AutoLock lock(context_->lock());
|
| +
|
| if (context_->result() != Ok && context_->result() != Done)
|
| return context_->result();
|
|
|
| @@ -171,6 +251,8 @@ Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
|
| }
|
|
|
| Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
|
| + base::AutoLock lock(context_->lock());
|
| +
|
| if (context_->IsEmpty())
|
| return UnexpectedError;
|
|
|
| @@ -179,14 +261,21 @@ Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
|
| }
|
|
|
| void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
|
| - context_->set_client(client);
|
| + bool needs_notification = false;
|
| + {
|
| + base::AutoLock lock(context_->lock());
|
|
|
| - if (!context_->IsEmpty())
|
| - client->didGetReadable();
|
| + context_->SetClient(client);
|
| + needs_notification = !context_->IsEmpty();
|
| + }
|
| + if (needs_notification)
|
| + context_->Notify();
|
| }
|
|
|
| void SharedMemoryDataConsumerHandle::unregisterClient() {
|
| - context_->set_client(nullptr);
|
| + base::AutoLock lock(context_->lock());
|
| +
|
| + context_->SetClient(nullptr);
|
| }
|
|
|
| } // namespace content
|
|
|