Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Side by Side Diff: content/child/shared_memory_data_consumer_handle.cc

Issue 1164493008: Implement WebDataConsumerHandle::Reader. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/child/shared_memory_data_consumer_handle.h" 5 #include "content/child/shared_memory_data_consumer_handle.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <deque> 8 #include <deque>
9 #include <vector>
10 9
11 #include "base/bind.h" 10 #include "base/bind.h"
12 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
13 #include "base/single_thread_task_runner.h" 12 #include "base/single_thread_task_runner.h"
14 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
15 #include "content/public/child/fixed_received_data.h" 14 #include "content/public/child/fixed_received_data.h"
16 15
17 namespace content { 16 namespace content {
18 17
19 namespace { 18 namespace {
(...skipping 27 matching lines...) Expand all
47 46
48 using Result = blink::WebDataConsumerHandle::Result; 47 using Result = blink::WebDataConsumerHandle::Result;
49 48
50 class SharedMemoryDataConsumerHandle::Context final 49 class SharedMemoryDataConsumerHandle::Context final
51 : public base::RefCountedThreadSafe<Context> { 50 : public base::RefCountedThreadSafe<Context> {
52 public: 51 public:
53 Context() 52 Context()
54 : result_(Ok), 53 : result_(Ok),
55 first_offset_(0), 54 first_offset_(0),
56 client_(nullptr), 55 client_(nullptr),
57 is_reader_active_(true) {} 56 is_handle_active_(true) {}
58 57
59 bool IsEmpty() const { return queue_.empty(); } 58 bool IsEmpty() const { return queue_.empty(); }
60 void Clear() { 59 void Clear() {
61 for (auto& data : queue_) { 60 for (auto& data : queue_) {
62 delete data; 61 delete data;
63 } 62 }
64 queue_.clear(); 63 queue_.clear();
65 first_offset_ = 0; 64 first_offset_ = 0;
66 client_ = nullptr; 65 client_ = nullptr;
67 } 66 }
68 void Notify() {
69 // Note that this function is not protected by |lock_| (actually it
70 // shouldn't be) but |notification_task_runner_| is thread-safe.
71
72 if (notification_task_runner_->BelongsToCurrentThread()) {
73 NotifyImmediately();
74 } else {
75 notification_task_runner_->PostTask(
76 FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
77 }
78 }
79
80 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); } 67 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
81 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) { 68 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
82 queue_.push_back(data.release()); 69 queue_.push_back(data.release());
83 } 70 }
84 size_t first_offset() const { return first_offset_; } 71 size_t first_offset() const { return first_offset_; }
85 Result result() const { return result_; } 72 Result result() const { return result_; }
86 void set_result(Result r) { result_ = r; } 73 void set_result(Result r) { result_ = r; }
87 Client* client() { return client_; } 74 void AcquireReaderLock(Client* client) {
88 void SetClient(Client* client) { 75 DCHECK(!notification_task_runner_);
89 if (client) { 76 DCHECK(!client_);
90 notification_task_runner_ = base::MessageLoop::current()->task_runner(); 77 notification_task_runner_ = base::MessageLoop::current()->task_runner();
91 client_ = client; 78 client_ = client;
92 } else { 79 if (client && !(IsEmpty() && result() == Ok)) {
93 notification_task_runner_ = nullptr; 80 // We cannot notify synchronously because the user doesn't have the reader
94 client_ = nullptr; 81 // yet.
82 notification_task_runner_->PostTask(
83 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
95 } 84 }
96 } 85 }
97 bool is_reader_active() const { return is_reader_active_; } 86 void ReleaseReaderLock() {
98 void set_is_reader_active(bool b) { is_reader_active_ = b; } 87 DCHECK(notification_task_runner_);
88 notification_task_runner_ = nullptr;
89 client_ = nullptr;
90 }
91 void Notify() { NotifyInternal(true); }
92 bool is_handle_locked() const { return notification_task_runner_; }
93 bool IsReaderBoundToCurrentThread() const {
94 return notification_task_runner_ &&
95 notification_task_runner_->BelongsToCurrentThread();
96 }
97 bool is_handle_active() const { return is_handle_active_; }
98 void set_is_handle_active(bool b) { is_handle_active_ = b; }
99 void Consume(size_t s) { 99 void Consume(size_t s) {
100 first_offset_ += s; 100 first_offset_ += s;
101 auto top = Top(); 101 auto top = Top();
102 if (static_cast<size_t>(top->length()) <= first_offset_) { 102 if (static_cast<size_t>(top->length()) <= first_offset_) {
103 delete top; 103 delete top;
104 queue_.pop_front(); 104 queue_.pop_front();
105 first_offset_ = 0; 105 first_offset_ = 0;
106 } 106 }
107 } 107 }
108 base::Lock& lock() { return lock_; } 108 base::Lock& lock() { return lock_; }
109 109
110 private: 110 private:
111 void NotifyInternal(bool repost) {
112 // Note that this function is not protected by |lock_|.
113
114 auto runner = notification_task_runner_;
115 if (!runner) {
116 // Do nothing.
117 } else if (runner->BelongsToCurrentThread()) {
118 // It is safe to access member variables without lock because |client_|
119 // is bound to the current thread.
120 if (client_)
121 client_->didGetReadable();
122 } else if (repost) {
123 // We don't re-post the task when the runner changes while waiting for
124 // this task.
125 runner->PostTask(FROM_HERE,
126 base::Bind(&Context::NotifyInternal, this, false));
127 }
128 }
129
111 friend class base::RefCountedThreadSafe<Context>; 130 friend class base::RefCountedThreadSafe<Context>;
112 ~Context() { 131 ~Context() {
113 // This is necessary because the queue stores raw pointers. 132 // This is necessary because the queue stores raw pointers.
114 Clear(); 133 Clear();
115 } 134 }
116 135
117 void NotifyImmediately() {
118 // As we can assume that all reader-side methods are called on this
119 // thread (see WebDataConsumerHandle comments), we don't need to lock.
120 if (client_)
121 client_->didGetReadable();
122 }
123
124 base::Lock lock_; 136 base::Lock lock_;
125 // |result_| stores the ultimate state of this handle if it has. Otherwise, 137 // |result_| stores the ultimate state of this handle if it has. Otherwise,
126 // |Ok| is set. 138 // |Ok| is set.
127 Result result_; 139 Result result_;
128 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>> 140 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
129 // once it is allowed. 141 // once it is allowed.
130 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_; 142 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
131 size_t first_offset_; 143 size_t first_offset_;
132 Client* client_; 144 Client* client_;
133 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_; 145 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
134 bool is_reader_active_; 146 bool is_handle_active_;
135 147
136 DISALLOW_COPY_AND_ASSIGN(Context); 148 DISALLOW_COPY_AND_ASSIGN(Context);
137 }; 149 };
138 150
139 SharedMemoryDataConsumerHandle::Writer::Writer( 151 SharedMemoryDataConsumerHandle::Writer::Writer(
140 const scoped_refptr<Context>& context, 152 const scoped_refptr<Context>& context,
141 BackpressureMode mode) 153 BackpressureMode mode)
142 : context_(context), mode_(mode) { 154 : context_(context), mode_(mode) {
143 } 155 }
144 156
145 SharedMemoryDataConsumerHandle::Writer::~Writer() { 157 SharedMemoryDataConsumerHandle::Writer::~Writer() {
146 Close(); 158 Close();
147 } 159 }
148 160
149 void SharedMemoryDataConsumerHandle::Writer::AddData( 161 void SharedMemoryDataConsumerHandle::Writer::AddData(
150 scoped_ptr<RequestPeer::ReceivedData> data) { 162 scoped_ptr<RequestPeer::ReceivedData> data) {
151 if (!data->length()) { 163 if (!data->length()) {
152 // We omit empty data. 164 // We omit empty data.
153 return; 165 return;
154 } 166 }
155 167
156 bool needs_notification = false; 168 bool needs_notification = false;
157 { 169 {
158 base::AutoLock lock(context_->lock()); 170 base::AutoLock lock(context_->lock());
159 if (!context_->is_reader_active()) { 171 if (!context_->is_handle_active() && !context_->is_handle_locked()) {
160 // No one is interested in the data. 172 // No one is interested in the data.
161 return; 173 return;
162 } 174 }
163 175
164 needs_notification = context_->client() && context_->IsEmpty(); 176 needs_notification = context_->IsEmpty();
165 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass; 177 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
166 if (mode_ == kApplyBackpressure) { 178 if (mode_ == kApplyBackpressure) {
167 data_to_pass = 179 data_to_pass =
168 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass())); 180 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
169 } else { 181 } else {
170 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get())); 182 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
171 } 183 }
172 context_->Push(data_to_pass.Pass()); 184 context_->Push(data_to_pass.Pass());
173 } 185 }
174 186
175 if (needs_notification) 187 if (needs_notification)
176 context_->Notify(); 188 context_->Notify();
177 } 189 }
178 190
179 void SharedMemoryDataConsumerHandle::Writer::Close() { 191 void SharedMemoryDataConsumerHandle::Writer::Close() {
180 bool needs_notification = false; 192 bool needs_notification = false;
181 193
182 { 194 {
183 base::AutoLock lock(context_->lock()); 195 base::AutoLock lock(context_->lock());
184 if (context_->result() == Ok) { 196 if (context_->result() == Ok) {
185 context_->set_result(Done); 197 context_->set_result(Done);
186 needs_notification = context_->client() && context_->IsEmpty(); 198 needs_notification = context_->IsEmpty();
187 } 199 }
188 } 200 }
189 if (needs_notification) 201 if (needs_notification)
190 context_->Notify(); 202 context_->Notify();
191 } 203 }
192 204
193 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle( 205 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
194 BackpressureMode mode, 206 scoped_refptr<Context> context,
195 scoped_ptr<Writer>* writer) 207 Client* client)
196 : context_(new Context) { 208 : context_(context) {
197 writer->reset(new Writer(context_, mode)); 209 DCHECK(!context_->is_handle_locked());
210 context_->AcquireReaderLock(client);
198 } 211 }
199 212
200 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() { 213 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
201 base::AutoLock lock(context_->lock()); 214 context_->ReleaseReaderLock();
202 context_->set_is_reader_active(false);
203 context_->Clear();
204 } 215 }
205 216
206 Result SharedMemoryDataConsumerHandle::read(void* data, 217 Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
207 size_t size, 218 void* data,
208 Flags flags, 219 size_t size,
209 size_t* read_size_to_return) { 220 Flags flags,
221 size_t* read_size_to_return) {
210 base::AutoLock lock(context_->lock()); 222 base::AutoLock lock(context_->lock());
211 223
212 size_t total_read_size = 0; 224 size_t total_read_size = 0;
213 *read_size_to_return = 0; 225 *read_size_to_return = 0;
214 if (context_->result() != Ok && context_->result() != Done) 226 if (context_->result() != Ok && context_->result() != Done)
215 return context_->result(); 227 return context_->result();
216 228
217 while (!context_->IsEmpty() && total_read_size < size) { 229 while (!context_->IsEmpty() && total_read_size < size) {
218 const auto& top = context_->Top(); 230 const auto& top = context_->Top();
219 size_t readable = top->length() - context_->first_offset(); 231 size_t readable = top->length() - context_->first_offset();
220 size_t writable = size - total_read_size; 232 size_t writable = size - total_read_size;
221 size_t read_size = std::min(readable, writable); 233 size_t read_size = std::min(readable, writable);
222 const char* begin = top->payload() + context_->first_offset(); 234 const char* begin = top->payload() + context_->first_offset();
223 std::copy(begin, begin + read_size, 235 std::copy(begin, begin + read_size,
224 static_cast<char*>(data) + total_read_size); 236 static_cast<char*>(data) + total_read_size);
225 total_read_size += read_size; 237 total_read_size += read_size;
226 context_->Consume(read_size); 238 context_->Consume(read_size);
227 } 239 }
228 *read_size_to_return = total_read_size; 240 *read_size_to_return = total_read_size;
229 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait; 241 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
230 } 242 }
231 243
232 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer, 244 Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
233 Flags flags, 245 const void** buffer,
234 size_t* available) { 246 Flags flags,
247 size_t* available) {
235 *buffer = nullptr; 248 *buffer = nullptr;
236 *available = 0; 249 *available = 0;
237 250
238 base::AutoLock lock(context_->lock()); 251 base::AutoLock lock(context_->lock());
239 252
240 if (context_->result() != Ok && context_->result() != Done) 253 if (context_->result() != Ok && context_->result() != Done)
241 return context_->result(); 254 return context_->result();
242 255
243 if (context_->IsEmpty()) 256 if (context_->IsEmpty())
244 return context_->result() == Done ? Done : ShouldWait; 257 return context_->result() == Done ? Done : ShouldWait;
245 258
246 const auto& top = context_->Top(); 259 const auto& top = context_->Top();
247 *buffer = top->payload() + context_->first_offset(); 260 *buffer = top->payload() + context_->first_offset();
248 *available = top->length() - context_->first_offset(); 261 *available = top->length() - context_->first_offset();
249 262
250 return Ok; 263 return Ok;
251 } 264 }
252 265
253 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) { 266 Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
254 base::AutoLock lock(context_->lock()); 267 base::AutoLock lock(context_->lock());
255 268
256 if (context_->IsEmpty()) 269 if (context_->IsEmpty())
257 return UnexpectedError; 270 return UnexpectedError;
258 271
259 context_->Consume(read_size); 272 context_->Consume(read_size);
260 return Ok; 273 return Ok;
261 } 274 }
262 275
276 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
277 BackpressureMode mode,
278 scoped_ptr<Writer>* writer)
279 : context_(new Context) {
280 writer->reset(new Writer(context_, mode));
281 }
282
283 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
284 base::AutoLock lock(context_->lock());
285 context_->set_is_handle_active(false);
286 context_->Clear();
hiroshige 2015/06/11 00:03:30 Do not call Clear() here because a reader might be
yhirano 2015/06/11 07:24:06 Thanks, done.
287 }
288
289 SharedMemoryDataConsumerHandle::ReaderImpl*
290 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
291 base::AutoLock lock(context_->lock());
292 scoped_ptr<ReaderImpl> reader;
293
294 reader.reset(new ReaderImpl(context_, client));
295 return reader.release();
296 }
297
298 Result SharedMemoryDataConsumerHandle::read(void* data,
299 size_t size,
300 Flags flags,
301 size_t* read_size_to_return) {
302 // Note this (and below similar functions) is a bit racy. We don't care about
303 // it because this is a deprecated function and will be removed shortly.
304 LockImplicitly();
305 return reader_->read(data, size, flags, read_size_to_return);
306 }
307
308 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
309 Flags flags,
310 size_t* available) {
311 LockImplicitly();
312 return reader_->beginRead(buffer, flags, available);
313 }
314
315 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
316 LockImplicitly();
317 return reader_->endRead(read_size);
318 }
319
263 void SharedMemoryDataConsumerHandle::registerClient(Client* client) { 320 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
264 bool needs_notification = false; 321 UnlockImplicitly();
265 { 322 reader_ = obtainReader(client);
266 base::AutoLock lock(context_->lock());
267
268 context_->SetClient(client);
269 needs_notification = !context_->IsEmpty();
270 }
271 if (needs_notification)
272 context_->Notify();
273 } 323 }
274 324
275 void SharedMemoryDataConsumerHandle::unregisterClient() { 325 void SharedMemoryDataConsumerHandle::unregisterClient() {
276 base::AutoLock lock(context_->lock()); 326 reader_.reset();
327 }
277 328
278 context_->SetClient(nullptr); 329 void SharedMemoryDataConsumerHandle::LockImplicitly() {
330 {
331 base::AutoLock lock(context_->lock());
332 if (reader_) {
333 DCHECK(context_->IsReaderBoundToCurrentThread());
334 return;
335 }
336 }
337 reader_ = obtainReader(nullptr);
338 }
339
340 void SharedMemoryDataConsumerHandle::UnlockImplicitly() {
341 bool needs_unlock = false;
342 {
343 base::AutoLock lock(context_->lock());
344 if (reader_) {
345 DCHECK(context_->IsReaderBoundToCurrentThread());
346 needs_unlock = true;
347 }
348 }
349 if (needs_unlock) {
350 reader_.reset();
351 }
279 } 352 }
280 353
281 } // namespace content 354 } // namespace content
OLDNEW
« no previous file with comments | « content/child/shared_memory_data_consumer_handle.h ('k') | content/child/shared_memory_data_consumer_handle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698