Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(568)

Unified Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1118233002: Introduce SharedMemoryDataConsumerHandle. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@data-received-with-ack
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/child/shared_memory_data_consumer_handle.cc
diff --git a/content/child/shared_memory_data_consumer_handle.cc b/content/child/shared_memory_data_consumer_handle.cc
new file mode 100644
index 0000000000000000000000000000000000000000..2dc61df1058643a82c5d3c2c39f6695c0b783fe3
--- /dev/null
+++ b/content/child/shared_memory_data_consumer_handle.cc
@@ -0,0 +1,192 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/child/shared_memory_data_consumer_handle.h"
+
+#include <algorithm>
+#include <deque>
+#include <vector>
+
+#include "content/public/child/fixed_received_data.h"
+
+namespace content {
+
+using Result = blink::WebDataConsumerHandle::Result;
+
+class SharedMemoryDataConsumerHandle::Context final
+ : public base::RefCountedThreadSafe<Context> {
+ public:
+ Context()
+ : result_(Ok),
+ first_offset_(0),
+ client_(nullptr),
+ is_reader_active_(true) {}
+
+ bool IsEmpty() const { return queue_.empty(); }
+ void Clear() {
+ for (RequestPeer::ReceivedData* data : queue_) {
+ delete data;
+ }
+ queue_.clear();
+ first_offset_ = 0;
+ client_ = nullptr;
+ }
+ RequestPeer::ReceivedData* Top() { return queue_.front(); }
+ void Push(scoped_ptr<RequestPeer::ReceivedData> data) {
+ queue_.push_back(data.release());
+ }
+ size_t first_offset() const { return first_offset_; }
+ Result result() const { return result_; }
+ void set_result(Result r) { result_ = r; }
+ Client* client() { return client_; }
+ void set_client(Client* client) { client_ = client; }
+ bool is_reader_active() const { return is_reader_active_; }
+ void set_is_reader_active(bool b) { is_reader_active_ = b; }
+ void Consume(size_t s) {
+ first_offset_ += s;
+ RequestPeer::ReceivedData* top = Top();
+ if (static_cast<size_t>(top->length()) <= first_offset_) {
+ delete top;
+ queue_.pop_front();
+ first_offset_ = 0;
+ }
+ }
+
+ private:
+ friend class base::RefCountedThreadSafe<Context>;
+ ~Context() {
+ // This is necessary because the queue stores raw pointers.
+ Clear();
+ }
+
+ // |result_| stores the ultimate state of this handle if it has. Otherwise,
+ // |Ok| is set.
+ Result result_;
+ // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed.
+ std::deque<RequestPeer::ReceivedData*> queue_;
+ size_t first_offset_;
+ Client* client_;
+ bool is_reader_active_;
+
+ DISALLOW_COPY_AND_ASSIGN(Context);
+};
+
+SharedMemoryDataConsumerHandle::Writer::Writer(
+ const scoped_refptr<Context>& context,
+ BackpressureMode mode)
+ : context_(context), mode_(mode) {
+}
+
+SharedMemoryDataConsumerHandle::Writer::~Writer() {
+ Close();
+}
+
+void SharedMemoryDataConsumerHandle::Writer::AddData(
+ scoped_ptr<RequestPeer::ReceivedData> data) {
+ if (!data->length()) {
+ // We omit empty data.
+ return;
+ }
+
+ if (!context_->is_reader_active()) {
+ // No one is interested in the data.
+ return;
+ }
+
+ bool needs_notification = context_->client() && context_->IsEmpty();
+ scoped_ptr<RequestPeer::ReceivedData> data_to_pass;
+ if (mode_ == kApplyBackpressure) {
+ data_to_pass = data.Pass();
+ } else {
+ data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
+ }
+ context_->Push(data_to_pass.Pass());
+
+ if (needs_notification)
+ context_->client()->didGetReadable();
+}
+
+void SharedMemoryDataConsumerHandle::Writer::Close() {
+ if (context_->result() == Ok) {
+ context_->set_result(Done);
+ if (context_->client() && context_->IsEmpty())
+ context_->client()->didGetReadable();
+ }
+}
+
+SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
+ BackpressureMode mode,
+ scoped_ptr<Writer>* writer)
+ : context_(new Context) {
+ writer->reset(new Writer(context_, mode));
+}
+
+SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
+ context_->set_is_reader_active(false);
+ context_->Clear();
+}
+
+Result SharedMemoryDataConsumerHandle::read(void* data,
+ size_t size,
+ Flags flags,
+ size_t* read_size_to_return) {
+ size_t total_read_size = 0;
+ *read_size_to_return = 0;
+ if (context_->result() != Ok && context_->result() != Done)
+ return context_->result();
+
+ while (!context_->IsEmpty() && total_read_size < size) {
+ const auto& top = context_->Top();
+ size_t readable = top->length() - context_->first_offset();
+ size_t writable = size - total_read_size;
+ size_t read_size = std::min(readable, writable);
+ const char* begin = top->payload() + context_->first_offset();
+ std::copy(begin, begin + read_size,
+ static_cast<char*>(data) + total_read_size);
+ total_read_size += read_size;
+ context_->Consume(read_size);
+ }
+ *read_size_to_return = total_read_size;
+ return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
+}
+
+Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
+ Flags flags,
+ size_t* available) {
+ *buffer = nullptr;
+ *available = 0;
+
+ if (context_->result() != Ok && context_->result() != Done)
+ return context_->result();
+
+ if (context_->IsEmpty())
+ return context_->result() == Done ? Done : ShouldWait;
+
+ const auto& top = context_->Top();
+ *buffer = top->payload() + context_->first_offset();
+ *available = top->length() - context_->first_offset();
+
+ return Ok;
+}
+
+Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
+ if (context_->IsEmpty())
+ return UnexpectedError;
+
+ context_->Consume(read_size);
+ return Ok;
+}
+
+void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
+ context_->set_client(client);
+
+ if (!context_->IsEmpty())
+ client->didGetReadable();
+}
+
+void SharedMemoryDataConsumerHandle::unregisterClient() {
+ context_->set_client(nullptr);
+}
+
+} // namespace content
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698