Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(163)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1164493008: Implement WebDataConsumerHandle::Reader. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/shared_memory_data_consumer_handle.h" 5 #include "content/child/shared_memory_data_consumer_handle.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <deque> 8 #include <deque>
9 #include <vector>
10 9
11 #include "base/bind.h" 10 #include "base/bind.h"
12 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
13 #include "base/single_thread_task_runner.h" 12 #include "base/single_thread_task_runner.h"
14 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
15 #include "content/public/child/fixed_received_data.h" 14 #include "content/public/child/fixed_received_data.h"
16 15
17 namespace content { 16 namespace content {
18 17
19 namespace { 18 namespace {
(...skipping 27 matching lines...) Expand all
47 46
48 using Result = blink::WebDataConsumerHandle::Result; 47 using Result = blink::WebDataConsumerHandle::Result;
49 48
50 class SharedMemoryDataConsumerHandle::Context final 49 class SharedMemoryDataConsumerHandle::Context final
51 : public base::RefCountedThreadSafe<Context> { 50 : public base::RefCountedThreadSafe<Context> {
52 public: 51 public:
53 Context() 52 Context()
54 : result_(Ok), 53 : result_(Ok),
55 first_offset_(0), 54 first_offset_(0),
56 client_(nullptr), 55 client_(nullptr),
57 is_reader_active_(true) {} 56 is_handle_active_(true) {}
58 57
59 bool IsEmpty() const { return queue_.empty(); } 58 bool IsEmpty() const { return queue_.empty(); }
60 void Clear() { 59 void ClearIfNecessary() {
61 for (auto& data : queue_) { 60 if (!is_handle_locked() && !is_handle_active()) {
62 delete data; 61 // No one is interested in the contents.
63 } 62 Clear();
64 queue_.clear();
65 first_offset_ = 0;
66 client_ = nullptr;
67 }
68 void Notify() {
69 // Note that this function is not protected by |lock_| (actually it
70 // shouldn't be) but |notification_task_runner_| is thread-safe.
71
72 if (notification_task_runner_->BelongsToCurrentThread()) {
73 NotifyImmediately();
74 } else {
75 notification_task_runner_->PostTask(
76 FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
77 } 63 }
78 } 64 }
79
80 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } 65 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
81 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { 66 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
82 queue_.push_back(data.release()); 67 queue_.push_back(data.release());
83 } 68 }
84 size_t first_offset() const { return first_offset_; } 69 size_t first_offset() const { return first_offset_; }
85 Result result() const { return result_; } 70 Result result() const { return result_; }
86 void set_result(Result r) { result_ = r; } 71 void set_result(Result r) { result_ = r; }
87 Client* client() { return client_; } 72 void AcquireReaderLock(Client* client) {
88 void SetClient(Client* client) { 73 DCHECK(!notification_task_runner_);
89 if (client) { 74 DCHECK(!client_);
90 notification_task_runner_ = base::MessageLoop::current()->task_runner(); 75 notification_task_runner_ = base::MessageLoop::current()->task_runner();
91 client_ = client; 76 client_ = client;
92 } else { 77 if (client && !(IsEmpty() && result() == Ok)) {
93 notification_task_runner_ = nullptr; 78 // We cannot notify synchronously because the user doesn't have the reader
94 client_ = nullptr; 79 // yet.
80 notification_task_runner_->PostTask(
81 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
95 } 82 }
96 } 83 }
97 bool is_reader_active() const { return is_reader_active_; } 84 void ReleaseReaderLock() {
98 void set_is_reader_active(bool b) { is_reader_active_ = b; } 85 DCHECK(notification_task_runner_);
86 notification_task_runner_ = nullptr;
87 client_ = nullptr;
88 }
89 void Notify() { NotifyInternal(true); }
90 bool is_handle_locked() const { return notification_task_runner_; }
91 bool IsReaderBoundToCurrentThread() const {
92 return notification_task_runner_ &&
93 notification_task_runner_->BelongsToCurrentThread();
94 }
95 bool is_handle_active() const { return is_handle_active_; }
96 void set_is_handle_active(bool b) { is_handle_active_ = b; }
99 void Consume(size_t s) { 97 void Consume(size_t s) {
100 first_offset_ += s; 98 first_offset_ += s;
101 auto top = Top(); 99 auto top = Top();
102 if (static_cast<size_t>(top->length()) <= first_offset_) { 100 if (static_cast<size_t>(top->length()) <= first_offset_) {
103 delete top; 101 delete top;
104 queue_.pop_front(); 102 queue_.pop_front();
105 first_offset_ = 0; 103 first_offset_ = 0;
106 } 104 }
107 } 105 }
108 base::Lock& lock() { return lock_; } 106 base::Lock& lock() { return lock_; }
109 107
110 private: 108 private:
109 void NotifyInternal(bool repost) {
110 // Note that this function is not protected by |lock_|.
111
112 auto runner = notification_task_runner_;
113 if (!runner)
114 return;
115
116 if (runner->BelongsToCurrentThread()) {
117 // It is safe to access member variables without lock because |client_|
118 // is bound to the current thread.
119 if (client_)
120 client_->didGetReadable();
121 return;
122 }
123 if (repost) {
124 // We don't re-post the task when the runner changes while waiting for
125 // this task because in this case a new reader is obtained and
126 // notification is already done at the reader creation time if necessary.
127 runner->PostTask(FROM_HERE,
128 base::Bind(&Context::NotifyInternal, this, false));
129 }
130 }
131 void Clear() {
132 for (auto& data : queue_) {
133 delete data;
134 }
135 queue_.clear();
136 first_offset_ = 0;
137 client_ = nullptr;
138 }
139
111 friend class base::RefCountedThreadSafe<Context>; 140 friend class base::RefCountedThreadSafe<Context>;
112 ~Context() { 141 ~Context() {
113 // This is necessary because the queue stores raw pointers. 142 // This is necessary because the queue stores raw pointers.
114 Clear(); 143 Clear();
115 } 144 }
116 145
117 void NotifyImmediately() {
118 // As we can assume that all reader-side methods are called on this
119 // thread (see WebDataConsumerHandle comments), we don't need to lock.
120 if (client_)
121 client_->didGetReadable();
122 }
123
124 base::Lock lock_; 146 base::Lock lock_;
125 // |result_| stores the ultimate state of this handle if it has. Otherwise, 147 // |result_| stores the ultimate state of this handle if it has. Otherwise,
126 // |Ok| is set. 148 // |Ok| is set.
127 Result result_; 149 Result result_;
128 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> 150 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
129 // once it is allowed. 151 // once it is allowed.
130 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; 152 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
131 size_t first_offset_; 153 size_t first_offset_;
132 Client* client_; 154 Client* client_;
133 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; 155 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
134 bool is_reader_active_; 156 bool is_handle_active_;
135 157
136 DISALLOW_COPY_AND_ASSIGN(Context); 158 DISALLOW_COPY_AND_ASSIGN(Context);
137 }; 159 };
138 160
139 SharedMemoryDataConsumerHandle::Writer::Writer( 161 SharedMemoryDataConsumerHandle::Writer::Writer(
140 const scoped_refptr<Context>& context, 162 const scoped_refptr<Context>& context,
141 BackpressureMode mode) 163 BackpressureMode mode)
142 : context_(context), mode_(mode) { 164 : context_(context), mode_(mode) {
143 } 165 }
144 166
145 SharedMemoryDataConsumerHandle::Writer::~Writer() { 167 SharedMemoryDataConsumerHandle::Writer::~Writer() {
146 Close(); 168 Close();
147 } 169 }
148 170
149 void SharedMemoryDataConsumerHandle::Writer::AddData( 171 void SharedMemoryDataConsumerHandle::Writer::AddData(
150 scoped_ptr<RequestPeer::ReceivedData> data) { 172 scoped_ptr<RequestPeer::ReceivedData> data) {
151 if (!data->length()) { 173 if (!data->length()) {
152 // We omit empty data. 174 // We omit empty data.
153 return; 175 return;
154 } 176 }
155 177
156 bool needs_notification = false; 178 bool needs_notification = false;
157 { 179 {
158 base::AutoLock lock(context_->lock()); 180 base::AutoLock lock(context_->lock());
159 if (!context_->is_reader_active()) { 181 if (!context_->is_handle_active() && !context_->is_handle_locked()) {
160 // No one is interested in the data. 182 // No one is interested in the data.
161 return; 183 return;
162 } 184 }
163 185
164 needs_notification = context_->client() && context_->IsEmpty(); 186 needs_notification = context_->IsEmpty();
165 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass; 187 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
166 if (mode_ == kApplyBackpressure) { 188 if (mode_ == kApplyBackpressure) {
167 data_to_pass = 189 data_to_pass =
168 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass())); 190 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
169 } else { 191 } else {
170 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); 192 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
171 } 193 }
172 context_->Push(data_to_pass.Pass()); 194 context_->Push(data_to_pass.Pass());
173 } 195 }
174 196
175 if (needs_notification) 197 if (needs_notification)
176 context_->Notify(); 198 context_->Notify();
177 } 199 }
178 200
179 void SharedMemoryDataConsumerHandle::Writer::Close() { 201 void SharedMemoryDataConsumerHandle::Writer::Close() {
180 bool needs_notification = false; 202 bool needs_notification = false;
181 203
182 { 204 {
183 base::AutoLock lock(context_->lock()); 205 base::AutoLock lock(context_->lock());
184 if (context_->result() == Ok) { 206 if (context_->result() == Ok) {
185 context_->set_result(Done); 207 context_->set_result(Done);
186 needs_notification = context_->client() && context_->IsEmpty(); 208 needs_notification = context_->IsEmpty();
187 } 209 }
188 } 210 }
189 if (needs_notification) 211 if (needs_notification)
190 context_->Notify(); 212 context_->Notify();
191 } 213 }
192 214
193 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( 215 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
194 BackpressureMode mode, 216 scoped_refptr<Context> context,
195 scoped_ptr<Writer>* writer) 217 Client* client)
196 : context_(new Context) { 218 : context_(context) {
197 writer->reset(new Writer(context_, mode)); 219 base::AutoLock lock(context_->lock());
220 DCHECK(!context_->is_handle_locked());
221 context_->AcquireReaderLock(client);
198 } 222 }
199 223
200 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { 224 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
201 base::AutoLock lock(context_->lock()); 225 base::AutoLock lock(context_->lock());
202 context_->set_is_reader_active(false); 226 context_->ReleaseReaderLock();
203 context_->Clear(); 227 context_->ClearIfNecessary();
204 } 228 }
205 229
206 Result SharedMemoryDataConsumerHandle::read(void* data, 230 Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
207 size_t size, 231 void* data,
208 Flags flags, 232 size_t size,
209 size_t* read_size_to_return) { 233 Flags flags,
234 size_t* read_size_to_return) {
210 base::AutoLock lock(context_->lock()); 235 base::AutoLock lock(context_->lock());
211 236
212 size_t total_read_size = 0; 237 size_t total_read_size = 0;
213 *read_size_to_return = 0; 238 *read_size_to_return = 0;
214 if (context_->result() != Ok && context_->result() != Done) 239 if (context_->result() != Ok && context_->result() != Done)
215 return context_->result(); 240 return context_->result();
216 241
217 while (!context_->IsEmpty() && total_read_size < size) { 242 while (!context_->IsEmpty() && total_read_size < size) {
218 const auto& top = context_->Top(); 243 const auto& top = context_->Top();
219 size_t readable = top->length() - context_->first_offset(); 244 size_t readable = top->length() - context_->first_offset();
220 size_t writable = size - total_read_size; 245 size_t writable = size - total_read_size;
221 size_t read_size = std::min(readable, writable); 246 size_t read_size = std::min(readable, writable);
222 const char* begin = top->payload() + context_->first_offset(); 247 const char* begin = top->payload() + context_->first_offset();
223 std::copy(begin, begin + read_size, 248 std::copy(begin, begin + read_size,
224 static_cast<char*>(data) + total_read_size); 249 static_cast<char*>(data) + total_read_size);
225 total_read_size += read_size; 250 total_read_size += read_size;
226 context_->Consume(read_size); 251 context_->Consume(read_size);
227 } 252 }
228 *read_size_to_return = total_read_size; 253 *read_size_to_return = total_read_size;
229 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; 254 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
230 } 255 }
231 256
232 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, 257 Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
233 Flags flags, 258 const void** buffer,
234 size_t* available) { 259 Flags flags,
260 size_t* available) {
235 *buffer = nullptr; 261 *buffer = nullptr;
236 *available = 0; 262 *available = 0;
237 263
238 base::AutoLock lock(context_->lock()); 264 base::AutoLock lock(context_->lock());
239 265
240 if (context_->result() != Ok && context_->result() != Done) 266 if (context_->result() != Ok && context_->result() != Done)
241 return context_->result(); 267 return context_->result();
242 268
243 if (context_->IsEmpty()) 269 if (context_->IsEmpty())
244 return context_->result() == Done ? Done : ShouldWait; 270 return context_->result() == Done ? Done : ShouldWait;
245 271
246 const auto& top = context_->Top(); 272 const auto& top = context_->Top();
247 *buffer = top->payload() + context_->first_offset(); 273 *buffer = top->payload() + context_->first_offset();
248 *available = top->length() - context_->first_offset(); 274 *available = top->length() - context_->first_offset();
249 275
250 return Ok; 276 return Ok;
251 } 277 }
252 278
253 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { 279 Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
254 base::AutoLock lock(context_->lock()); 280 base::AutoLock lock(context_->lock());
255 281
256 if (context_->IsEmpty()) 282 if (context_->IsEmpty())
257 return UnexpectedError; 283 return UnexpectedError;
258 284
259 context_->Consume(read_size); 285 context_->Consume(read_size);
260 return Ok; 286 return Ok;
261 } 287 }
262 288
289 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
290 BackpressureMode mode,
291 scoped_ptr<Writer>* writer)
292 : context_(new Context) {
293 writer->reset(new Writer(context_, mode));
294 }
295
296 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
297 base::AutoLock lock(context_->lock());
298 context_->set_is_handle_active(false);
299 context_->ClearIfNecessary();
300 }
301
302 scoped_ptr<blink::WebDataConsumerHandle::Reader>
303 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) {
304 return make_scoped_ptr(obtainReaderInternal(client));
305 }
306
307 SharedMemoryDataConsumerHandle::ReaderImpl*
308 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
309 return new ReaderImpl(context_, client);
310 }
311
312 Result SharedMemoryDataConsumerHandle::read(void* data,
313 size_t size,
314 Flags flags,
315 size_t* read_size_to_return) {
316 // Note this (and below similar functions) is a bit racy. We don't care about
317 // it because this is a deprecated function and will be removed shortly.
318 LockImplicitly();
319 return reader_->read(data, size, flags, read_size_to_return);
320 }
321
322 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
323 Flags flags,
324 size_t* available) {
325 LockImplicitly();
326 return reader_->beginRead(buffer, flags, available);
327 }
328
329 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
330 LockImplicitly();
331 return reader_->endRead(read_size);
332 }
333
263 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { 334 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
264 bool needs_notification = false; 335 UnlockImplicitly();
265 { 336 reader_ = ObtainReader(client);
266 base::AutoLock lock(context_->lock());
267
268 context_->SetClient(client);
269 needs_notification = !context_->IsEmpty();
270 }
271 if (needs_notification)
272 context_->Notify();
273 } 337 }
274 338
275 void SharedMemoryDataConsumerHandle::unregisterClient() { 339 void SharedMemoryDataConsumerHandle::unregisterClient() {
276 base::AutoLock lock(context_->lock()); 340 reader_.reset();
341 }
277 342
278 context_->SetClient(nullptr); 343 void SharedMemoryDataConsumerHandle::LockImplicitly() {
344 {
345 base::AutoLock lock(context_->lock());
346 if (reader_) {
347 DCHECK(context_->IsReaderBoundToCurrentThread());
348 return;
349 }
350 }
351 reader_ = ObtainReader(nullptr);
352 }
353
354 void SharedMemoryDataConsumerHandle::UnlockImplicitly() {
355 bool needs_unlock = false;
356 {
357 base::AutoLock lock(context_->lock());
358 if (reader_) {
359 DCHECK(context_->IsReaderBoundToCurrentThread());
360 needs_unlock = true;
361 }
362 }
363 if (needs_unlock) {
364 reader_.reset();
365 }
279 } 366 }
280 367
281 } // namespace content 368 } // namespace content
OLDNEW
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698