Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(192)

Unified Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1144033002: Make SharedMemoryDataConsumerHandle thread-safe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@ipc-data-consumer
Patch Set: rebase Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/child/shared_memory_data_consumer_handle.cc
diff --git a/content/child/shared_memory_data_consumer_handle.cc b/content/child/shared_memory_data_consumer_handle.cc
index 2dc61df1058643a82c5d3c2c39f6695c0b783fe3..f5d6e69f5654f0ed6bc13c0519ae3f9c830cfe07 100644
--- a/content/child/shared_memory_data_consumer_handle.cc
+++ b/content/child/shared_memory_data_consumer_handle.cc
@@ -8,10 +8,43 @@
#include <deque>
#include <vector>
+#include "base/bind.h"
+#include "base/message_loop/message_loop.h"
+#include "base/single_thread_task_runner.h"
+#include "base/synchronization/lock.h"
#include "content/public/child/fixed_received_data.h"
namespace content {
+namespace {
+
+class DelegateThreadSafeReceivedData final
+ : public RequestPeer::ThreadSafeReceivedData {
+ public:
+ explicit DelegateThreadSafeReceivedData(
+ scoped_ptr<RequestPeer::ReceivedData> data)
+ : data_(data.Pass()),
+ task_runner_(base::MessageLoop::current()->task_runner()) {}
+ ~DelegateThreadSafeReceivedData() override {
+ if (!task_runner_->BelongsToCurrentThread()) {
+ // Delete the data on the original thread.
+ task_runner_->DeleteSoon(FROM_HERE, data_.release());
+ }
+ }
+
+ const char* payload() const override { return data_->payload(); }
+ int length() const override { return data_->length(); }
+ int encoded_length() const override { return data_->encoded_length(); }
+
+ private:
+ scoped_ptr<RequestPeer::ReceivedData> data_;
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+
+ DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
+};
+
+} // namespace
+
using Result = blink::WebDataConsumerHandle::Result;
class SharedMemoryDataConsumerHandle::Context final
@@ -25,33 +58,54 @@ class SharedMemoryDataConsumerHandle::Context final
bool IsEmpty() const { return queue_.empty(); }
void Clear() {
- for (RequestPeer::ReceivedData* data : queue_) {
+ for (auto& data : queue_) {
delete data;
}
queue_.clear();
first_offset_ = 0;
client_ = nullptr;
}
- RequestPeer::ReceivedData* Top() { return queue_.front(); }
- void Push(scoped_ptr<RequestPeer::ReceivedData> data) {
+ void Notify() {
+ // Note that this function is not protected by |lock_| (actually it
+ // shouldn't be) but |notification_task_runner_| is thread-safe.
+
+ if (notification_task_runner_->BelongsToCurrentThread()) {
+ NotifyImmediately();
+ } else {
+ notification_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
+ }
+ }
+
+ RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
+ void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
queue_.push_back(data.release());
}
size_t first_offset() const { return first_offset_; }
Result result() const { return result_; }
void set_result(Result r) { result_ = r; }
Client* client() { return client_; }
- void set_client(Client* client) { client_ = client; }
+ void SetClient(Client* client) {
+ if (client) {
+ notification_task_runner_ = base::MessageLoop::current()->task_runner();
+ client_ = client;
+ } else {
+ notification_task_runner_ = nullptr;
+ client_ = nullptr;
+ }
+ }
bool is_reader_active() const { return is_reader_active_; }
void set_is_reader_active(bool b) { is_reader_active_ = b; }
void Consume(size_t s) {
first_offset_ += s;
- RequestPeer::ReceivedData* top = Top();
+ auto top = Top();
if (static_cast<size_t>(top->length()) <= first_offset_) {
delete top;
queue_.pop_front();
first_offset_ = 0;
}
}
+ base::Lock& lock() { return lock_; }
private:
friend class base::RefCountedThreadSafe<Context>;
@@ -60,13 +114,23 @@ class SharedMemoryDataConsumerHandle::Context final
Clear();
}
+ void NotifyImmediately() {
+ // As we can assume that all reader-side methods are called on this
+ // thread (see WebDataConsumerHandle comments), we don't need to lock.
+ if (client_)
+ client_->didGetReadable();
+ }
+
+ base::Lock lock_;
// |result_| stores the ultimate state of this handle if it has. Otherwise,
// |Ok| is set.
Result result_;
- // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed.
- std::deque<RequestPeer::ReceivedData*> queue_;
+ // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
+ // once it is allowed.
+ std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
size_t first_offset_;
Client* client_;
+ scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
bool is_reader_active_;
DISALLOW_COPY_AND_ASSIGN(Context);
@@ -89,30 +153,41 @@ void SharedMemoryDataConsumerHandle::Writer::AddData(
return;
}
- if (!context_->is_reader_active()) {
- // No one is interested in the data.
- return;
- }
+ bool needs_notification = false;
+ {
+ base::AutoLock lock(context_->lock());
+ if (!context_->is_reader_active()) {
+ // No one is interested in the data.
+ return;
+ }
- bool needs_notification = context_->client() && context_->IsEmpty();
- scoped_ptr<RequestPeer::ReceivedData> data_to_pass;
- if (mode_ == kApplyBackpressure) {
- data_to_pass = data.Pass();
- } else {
- data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
+ needs_notification = context_->client() && context_->IsEmpty();
+ scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
+ if (mode_ == kApplyBackpressure) {
+ data_to_pass =
+ make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
+ } else {
+ data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
+ }
+ context_->Push(data_to_pass.Pass());
}
- context_->Push(data_to_pass.Pass());
if (needs_notification)
- context_->client()->didGetReadable();
+ context_->Notify();
}
void SharedMemoryDataConsumerHandle::Writer::Close() {
- if (context_->result() == Ok) {
- context_->set_result(Done);
- if (context_->client() && context_->IsEmpty())
- context_->client()->didGetReadable();
+ bool needs_notification = false;
+
+ {
+ base::AutoLock lock(context_->lock());
+ if (context_->result() == Ok) {
+ context_->set_result(Done);
+ needs_notification = context_->client() && context_->IsEmpty();
+ }
}
+ if (needs_notification)
+ context_->Notify();
}
SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
@@ -123,6 +198,7 @@ SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
}
SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
+ base::AutoLock lock(context_->lock());
context_->set_is_reader_active(false);
context_->Clear();
}
@@ -131,6 +207,8 @@ Result SharedMemoryDataConsumerHandle::read(void* data,
size_t size,
Flags flags,
size_t* read_size_to_return) {
+ base::AutoLock lock(context_->lock());
+
size_t total_read_size = 0;
*read_size_to_return = 0;
if (context_->result() != Ok && context_->result() != Done)
@@ -157,6 +235,8 @@ Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
*buffer = nullptr;
*available = 0;
+ base::AutoLock lock(context_->lock());
+
if (context_->result() != Ok && context_->result() != Done)
return context_->result();
@@ -171,6 +251,8 @@ Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
}
Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
+ base::AutoLock lock(context_->lock());
+
if (context_->IsEmpty())
return UnexpectedError;
@@ -179,14 +261,21 @@ Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
}
void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
- context_->set_client(client);
+ bool needs_notification = false;
+ {
+ base::AutoLock lock(context_->lock());
- if (!context_->IsEmpty())
- client->didGetReadable();
+ context_->SetClient(client);
+ needs_notification = !context_->IsEmpty();
+ }
+ if (needs_notification)
+ context_->Notify();
}
void SharedMemoryDataConsumerHandle::unregisterClient() {
- context_->set_client(nullptr);
+ base::AutoLock lock(context_->lock());
+
+ context_->SetClient(nullptr);
}
} // namespace content
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698