Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(797)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1118233002: Introduce SharedMemoryDataConsumerHandle. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@data-received-with-ack
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/child/shared_memory_data_consumer_handle.h"
6
7 #include <algorithm>
8 #include <deque>
9 #include <vector>
10
11 namespace content {
12
13 namespace {
14
15 class FixedReceivedData final : public RequestPeer::ReceivedData {
16 public:
17 explicit FixedReceivedData(RequestPeer::ReceivedData* data)
18 : data_(data->payload(), data->payload() + data->length()),
19 encoded_length_(data->encoded_length()) {}
20
21 const char* payload() const override {
22 return data_.empty() ? nullptr : &data_[0];
23 }
24 int length() const override { return static_cast<int>(data_.size()); }
25 int encoded_length() const override { return encoded_length_; }
26
27 private:
28 const std::vector<char> data_;
29 const int encoded_length_;
30
31 DISALLOW_COPY_AND_ASSIGN(FixedReceivedData);
32 };
33
34 } // namespace
35
36 using Result = blink::WebDataConsumerHandle::Result;
37 using Data = RequestPeer::ReceivedData;
38
39 class SharedMemoryDataConsumerHandle::Context final
40 : public base::RefCountedThreadSafe<Context> {
41 public:
42 Context()
43 : result_(Ok),
44 first_offset_(0),
45 client_(nullptr),
46 is_reader_active_(true) {}
47
48 bool IsEmpty() const { return queue_.empty(); }
49 void Clear() {
50 for (Data* data : queue_) {
51 delete data;
52 }
53 queue_.clear();
54 first_offset_ = 0;
55 client_ = nullptr;
56 }
57 Data* Top() { return queue_.front(); }
58 void Push(scoped_ptr<Data> data) { queue_.push_back(data.release()); }
59 size_t first_offset() const { return first_offset_; }
60 Result result() const { return result_; }
61 void set_result(Result r) { result_ = r; }
62 Client* client() { return client_; }
63 void set_client(Client* client) { client_ = client; }
64 bool is_reader_active() const { return is_reader_active_; }
65 void set_is_reader_active(bool b) { is_reader_active_ = b; }
66 void Consume(size_t s) {
67 first_offset_ += s;
68 Data* top = Top();
69 if (static_cast<size_t>(top->length()) <= first_offset_) {
70 delete top;
71 queue_.pop_front();
72 first_offset_ = 0;
73 }
74 }
75
76
tyoshino (SeeGerritForStatus) 2015/05/28 07:59:52 one blank line
yhirano 2015/05/28 08:10:58 Done.
77 private:
78 friend class base::RefCountedThreadSafe<Context>;
79 ~Context() {
80 // This is necessary because the queue stores raw pointers.
81 Clear();
82 }
83
84 // |result_| stores the ultimate state of this handle if it has. Otherwise,
85 // |Ok| is set.
86 Result result_;
87 // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed.
88 std::deque<RequestPeer::ReceivedData*> queue_;
89 size_t first_offset_;
90 Client* client_;
91 bool is_reader_active_;
92
93 DISALLOW_COPY_AND_ASSIGN(Context);
94 };
95
96 SharedMemoryDataConsumerHandle::Writer::Writer(
97 const scoped_refptr<Context>& context,
98 BackpressureMode mode)
99 : context_(context), mode_(mode) {
100 }
101
102 SharedMemoryDataConsumerHandle::Writer::~Writer() {
103 Close();
104 }
105
106 void SharedMemoryDataConsumerHandle::Writer::AddData(scoped_ptr<Data> data) {
107 if (!data->length()) {
108 // We omit empty data.
109 return;
110 }
111
112 if (!context_->is_reader_active()) {
113 // No one is interested in the data.
114 return;
115 }
116
117 bool needs_notification = context_->client() && context_->IsEmpty();
118 scoped_ptr<RequestPeer::ReceivedData> data_to_pass;
119 if (mode_ == kApplyBackpressure) {
120 data_to_pass = data.Pass();
121 } else {
122 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
123 }
124 context_->Push(data_to_pass.Pass());
125
126 if (needs_notification)
127 context_->client()->didGetReadable();
128 }
129
130 void SharedMemoryDataConsumerHandle::Writer::Close() {
131 if (context_->result() == Ok) {
132 context_->set_result(Done);
133 if (context_->client() && context_->IsEmpty())
134 context_->client()->didGetReadable();
135 }
136 }
137
138 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
139 BackpressureMode mode,
140 scoped_ptr<Writer>* writer)
141 : context_(new Context) {
142 writer->reset(new Writer(context_, mode));
143 }
144
145 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
146 context_->set_is_reader_active(false);
147 context_->Clear();
148 }
149
150 Result SharedMemoryDataConsumerHandle::read(void* data,
151 size_t size,
152 Flags flags,
153 size_t* read_size_to_return) {
154 size_t total_read_size = 0;
155 *read_size_to_return = 0;
156 if (context_->result() != Ok && context_->result() != Done)
157 return context_->result();
158
159 while (!context_->IsEmpty() && total_read_size < size) {
160 const auto& top = context_->Top();
161 size_t readable = top->length() - context_->first_offset();
162 size_t writable = size - total_read_size;
163 size_t read_size = std::min(readable, writable);
164 const char* begin = top->payload() + context_->first_offset();
165 std::copy(begin, begin + read_size,
166 static_cast<char*>(data) + total_read_size);
167 total_read_size += read_size;
168 context_->Consume(read_size);
169 }
170 *read_size_to_return = total_read_size;
171 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
172 }
173
174 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
175 Flags flags,
176 size_t* available) {
177 *buffer = nullptr;
178 *available = 0;
179
180 if (context_->result() != Ok && context_->result() != Done)
181 return context_->result();
182
183 if (context_->IsEmpty())
184 return context_->result() == Done ? Done : ShouldWait;
185
186 const auto& top = context_->Top();
187 *buffer = top->payload() + context_->first_offset();
188 *available = top->length() - context_->first_offset();
189
190 return Ok;
191 }
192
193 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
194 if (context_->IsEmpty())
195 return UnexpectedError;
196
197 context_->Consume(read_size);
198 return Ok;
199 }
200
201 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
202 context_->set_client(client);
203
204 if (!context_->IsEmpty())
205 client->didGetReadable();
206 }
207
208 void SharedMemoryDataConsumerHandle::unregisterClient() {
209 context_->set_client(nullptr);
210 }
211
212 } // namespace content
OLDNEW
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698