Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(840)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1118233002: Introduce SharedMemoryDataConsumerHandle. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@data-received-with-ack
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/child/shared_memory_data_consumer_handle.h"
6
7 #include <algorithm>
8 #include <deque>
9 #include <vector>
10
11 namespace content {
12
13 namespace {
14
15 class FixedReceivedData final : public RequestPeer::ReceivedData {
16 public:
17 explicit FixedReceivedData(RequestPeer::ReceivedData* data)
18 : data_(data->payload(), data->payload() + data->length()),
19 encoded_length_(data->encoded_length()) {}
20
21 const char* payload() const override {
22 return data_.empty() ? nullptr : &data_[0];
23 }
24 int length() const override { return static_cast<int>(data_.size()); }
25 int encoded_length() const override { return encoded_length_; }
26
27 private:
28 const std::vector<char> data_;
29 const int encoded_length_;
30
31 DISALLOW_COPY_AND_ASSIGN(FixedReceivedData);
32 };
33
34 } // namespace
35
36 using Result = blink::WebDataConsumerHandle::Result;
37 using Data = RequestPeer::ReceivedData;
38
39 class SharedMemoryDataConsumerHandle::Context final
40 : public base::RefCountedThreadSafe<Context> {
41 public:
42 Context()
43 : result_(Ok),
44 first_offset_(0),
45 client_(nullptr),
46 is_reader_active_(true) {}
47
48 bool IsEmpty() const { return queue_.empty(); }
49 void Clear() {
50 for (Data* data : queue_) {
51 delete data;
52 }
53 queue_.clear();
54 first_offset_ = 0;
55 client_ = nullptr;
56 }
57 void PopReadData() {
58 Data* top = Top();
59 if (static_cast<size_t>(top->length()) <= first_offset_) {
60 delete top;
61 queue_.pop_front();
62 first_offset_ = 0;
63 }
64 }
65 Data* Top() { return queue_.front(); }
66 void Push(scoped_ptr<Data> data) { queue_.push_back(data.release()); }
67 size_t first_offset() const { return first_offset_; }
68 void AdvanceFirstOffset(size_t s) { first_offset_ += s; }
69 Result result() const { return result_; }
70 void set_result(Result r) { result_ = r; }
71 Client* client() { return client_; }
72 void set_client(Client* client) { client_ = client; }
73 bool is_reader_active() const { return is_reader_active_; }
74 void set_is_reader_active(bool b) { is_reader_active_ = b; }
75
76 private:
77 friend class base::RefCountedThreadSafe<Context>;
78 ~Context() {
79 // This is necessary because the queue stores raw pointers.
80 Clear();
81 }
82
83 // |result_| stores the ultimate state of this handle if it has. Otherwise,
84 // |Ok| is set.
85 Result result_;
86 // TODO(yhirano): Use std::deque<scoped_ptr<ReceivedData>> once it is allowed.
87 std::deque<RequestPeer::ReceivedData*> queue_;
88 size_t first_offset_;
89 Client* client_;
90 bool is_reader_active_;
91
92 DISALLOW_COPY_AND_ASSIGN(Context);
93 };
94
95 SharedMemoryDataConsumerHandle::Writer::Writer(
96 const scoped_refptr<Context>& context,
97 BackpressureMode mode)
98 : context_(context), mode_(mode) {
99 }
100
101 SharedMemoryDataConsumerHandle::Writer::~Writer() {
102 Close();
103 }
104
105 void SharedMemoryDataConsumerHandle::Writer::AddData(scoped_ptr<Data> data) {
106 if (!data->length()) {
107 // We omit empty data.
108 return;
109 }
110
111 if (!context_->is_reader_active()) {
112 // No one is interested in the data.
113 return;
114 }
115
116 bool needs_notification = context_->client() && context_->IsEmpty();
117 scoped_ptr<RequestPeer::ReceivedData> data_to_pass;
118 if (mode_ == kApplyBackpressure) {
119 data_to_pass = data.Pass();
120 } else {
121 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
122 }
123 context_->Push(data_to_pass.Pass());
124
125 if (needs_notification)
126 context_->client()->didGetReadable();
127 }
128
129 void SharedMemoryDataConsumerHandle::Writer::Close() {
130 if (context_->result() == Ok) {
131 context_->set_result(Done);
132 if (context_->client() && context_->IsEmpty())
133 context_->client()->didGetReadable();
134 }
135 }
136
137 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
138 BackpressureMode mode,
139 scoped_ptr<Writer>* writer)
140 : context_(new Context) {
141 writer->reset(new Writer(context_, mode));
142 }
143
144 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
145 context_->set_is_reader_active(false);
146 context_->Clear();
147 }
148
149 Result SharedMemoryDataConsumerHandle::read(void* data,
150 size_t size,
151 Flags flags,
152 size_t* read_size_to_return) {
153 size_t total_read_size = 0;
154 *read_size_to_return = 0;
155 if (context_->result() != Ok && context_->result() != Done)
156 return context_->result();
157
158 while (!context_->IsEmpty() && total_read_size < size) {
159 const auto& top = context_->Top();
160 size_t readable = top->length() - context_->first_offset();
161 size_t writable = size - total_read_size;
162 size_t read_size = std::min(readable, writable);
163 const char* begin = top->payload() + context_->first_offset();
164 std::copy(begin, begin + read_size,
165 static_cast<char*>(data) + total_read_size);
166 total_read_size += read_size;
167 context_->AdvanceFirstOffset(read_size);
tyoshino (SeeGerritForStatus) 2015/05/26 08:05:13 how about exposing only a single method named Cons
yhirano 2015/05/26 08:19:42 Done.
168 context_->PopReadData();
169 }
170 *read_size_to_return = total_read_size;
171 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
172 }
173
174 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
175 Flags flags,
176 size_t* available) {
177 *buffer = nullptr;
178 *available = 0;
179
180 if (context_->result() != Ok && context_->result() != Done)
181 return context_->result();
182
183 if (context_->IsEmpty())
184 return context_->result() == Done ? Done : ShouldWait;
185
186 const auto& top = context_->Top();
187 *buffer = top->payload() + context_->first_offset();
188 *available = top->length() - context_->first_offset();
189
190 return Ok;
191 }
192
193 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
194 if (context_->IsEmpty())
195 return UnexpectedError;
196
197 context_->AdvanceFirstOffset(read_size);
198 context_->PopReadData();
199 return Ok;
200 }
201
202 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
203 context_->set_client(client);
204
205 if (!context_->IsEmpty())
206 client->didGetReadable();
207 }
208
209 void SharedMemoryDataConsumerHandle::unregisterClient() {
210 context_->set_client(nullptr);
211 }
212
213 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698