Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Side by Side Diff: content/browser/byte_stream.cc

Issue 2873333004: Rename TaskRunner::RunsTasksOnCurrentThread() in //content (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/byte_stream.h" 5 #include "content/browser/byte_stream.h"
6 6
7 #include <deque> 7 #include <deque>
8 #include <set> 8 #include <set>
9 #include <utility> 9 #include <utility>
10 10
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 my_task_runner_(task_runner), 184 my_task_runner_(task_runner),
185 my_lifetime_flag_(lifetime_flag), 185 my_lifetime_flag_(lifetime_flag),
186 input_contents_size_(0), 186 input_contents_size_(0),
187 output_size_used_(0), 187 output_size_used_(0),
188 peer_(NULL) { 188 peer_(NULL) {
189 DCHECK(my_lifetime_flag_.get()); 189 DCHECK(my_lifetime_flag_.get());
190 my_lifetime_flag_->is_alive = true; 190 my_lifetime_flag_->is_alive = true;
191 } 191 }
192 192
193 ByteStreamWriterImpl::~ByteStreamWriterImpl() { 193 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
194 // No RunsTasksOnCurrentThread() check to allow deleting a created writer 194 // No RunsTasksInCurrentSequence() check to allow deleting a created writer
195 // before we start using it. Once started, should be deleted on the specified 195 // before we start using it. Once started, should be deleted on the specified
196 // task runner. 196 // task runner.
197 my_lifetime_flag_->is_alive = false; 197 my_lifetime_flag_->is_alive = false;
198 } 198 }
199 199
200 void ByteStreamWriterImpl::SetPeer( 200 void ByteStreamWriterImpl::SetPeer(
201 ByteStreamReaderImpl* peer, 201 ByteStreamReaderImpl* peer,
202 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 202 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
203 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 203 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
204 peer_ = peer; 204 peer_ = peer;
205 peer_task_runner_ = peer_task_runner; 205 peer_task_runner_ = peer_task_runner;
206 peer_lifetime_flag_ = peer_lifetime_flag; 206 peer_lifetime_flag_ = peer_lifetime_flag;
207 } 207 }
208 208
209 bool ByteStreamWriterImpl::Write( 209 bool ByteStreamWriterImpl::Write(
210 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { 210 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
211 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 211 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
212 212
213 // Check overflow. 213 // Check overflow.
214 // 214 //
215 // TODO(tyoshino): Discuss with content/browser/download developer and if 215 // TODO(tyoshino): Discuss with content/browser/download developer and if
216 // they're fine with, set smaller limit and make it configurable. 216 // they're fine with, set smaller limit and make it configurable.
217 size_t space_limit = std::numeric_limits<size_t>::max() - 217 size_t space_limit = std::numeric_limits<size_t>::max() -
218 GetTotalBufferedBytes(); 218 GetTotalBufferedBytes();
219 if (byte_count > space_limit) { 219 if (byte_count > space_limit) {
220 // TODO(tyoshino): Tell the user that Write() failed. 220 // TODO(tyoshino): Tell the user that Write() failed.
221 // Ignore input. 221 // Ignore input.
222 return false; 222 return false;
223 } 223 }
224 224
225 input_contents_.push_back(std::make_pair(buffer, byte_count)); 225 input_contents_.push_back(std::make_pair(buffer, byte_count));
226 input_contents_size_ += byte_count; 226 input_contents_size_ += byte_count;
227 227
228 // Arbitrarily, we buffer to a third of the total size before sending. 228 // Arbitrarily, we buffer to a third of the total size before sending.
229 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) 229 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
230 PostToPeer(false, 0); 230 PostToPeer(false, 0);
231 231
232 return GetTotalBufferedBytes() <= total_buffer_size_; 232 return GetTotalBufferedBytes() <= total_buffer_size_;
233 } 233 }
234 234
235 void ByteStreamWriterImpl::Flush() { 235 void ByteStreamWriterImpl::Flush() {
236 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 236 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
237 if (input_contents_size_ > 0) 237 if (input_contents_size_ > 0)
238 PostToPeer(false, 0); 238 PostToPeer(false, 0);
239 } 239 }
240 240
241 void ByteStreamWriterImpl::Close(int status) { 241 void ByteStreamWriterImpl::Close(int status) {
242 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 242 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
243 PostToPeer(true, status); 243 PostToPeer(true, status);
244 } 244 }
245 245
246 void ByteStreamWriterImpl::RegisterCallback( 246 void ByteStreamWriterImpl::RegisterCallback(
247 const base::Closure& source_callback) { 247 const base::Closure& source_callback) {
248 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 248 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
249 space_available_callback_ = source_callback; 249 space_available_callback_ = source_callback;
250 } 250 }
251 251
252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const { 252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
253 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 253 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
254 // This sum doesn't overflow since Write() fails if this sum is going to 254 // This sum doesn't overflow since Write() fails if this sum is going to
255 // overflow. 255 // overflow.
256 return input_contents_size_ + output_size_used_; 256 return input_contents_size_ + output_size_used_;
257 } 257 }
258 258
259 // static 259 // static
260 void ByteStreamWriterImpl::UpdateWindow( 260 void ByteStreamWriterImpl::UpdateWindow(
261 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, 261 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
262 size_t bytes_consumed) { 262 size_t bytes_consumed) {
263 // If the target object isn't alive anymore, we do nothing. 263 // If the target object isn't alive anymore, we do nothing.
264 if (!lifetime_flag->is_alive) return; 264 if (!lifetime_flag->is_alive) return;
265 265
266 target->UpdateWindowInternal(bytes_consumed); 266 target->UpdateWindowInternal(bytes_consumed);
267 } 267 }
268 268
269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { 269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
270 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 270 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
271 271
272 bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_; 272 bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
273 273
274 DCHECK_GE(output_size_used_, bytes_consumed); 274 DCHECK_GE(output_size_used_, bytes_consumed);
275 output_size_used_ -= bytes_consumed; 275 output_size_used_ -= bytes_consumed;
276 276
277 // Callback if we were above the limit and we're now <= to it. 277 // Callback if we were above the limit and we're now <= to it.
278 bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_; 278 bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
279 279
280 if (no_longer_above_limit && was_above_limit && 280 if (no_longer_above_limit && was_above_limit &&
281 !space_available_callback_.is_null()) 281 !space_available_callback_.is_null())
282 space_available_callback_.Run(); 282 space_available_callback_.Run();
283 } 283 }
284 284
285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { 285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
286 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 286 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
287 // Valid contexts in which to call. 287 // Valid contexts in which to call.
288 DCHECK(complete || 0 != input_contents_size_); 288 DCHECK(complete || 0 != input_contents_size_);
289 289
290 std::unique_ptr<ContentVector> transfer_buffer; 290 std::unique_ptr<ContentVector> transfer_buffer;
291 size_t buffer_size = 0; 291 size_t buffer_size = 0;
292 if (0 != input_contents_size_) { 292 if (0 != input_contents_size_) {
293 transfer_buffer.reset(new ContentVector); 293 transfer_buffer.reset(new ContentVector);
294 transfer_buffer->swap(input_contents_); 294 transfer_buffer->swap(input_contents_);
295 buffer_size = input_contents_size_; 295 buffer_size = input_contents_size_;
296 output_size_used_ += input_contents_size_; 296 output_size_used_ += input_contents_size_;
(...skipping 19 matching lines...) Expand all
316 my_lifetime_flag_(lifetime_flag), 316 my_lifetime_flag_(lifetime_flag),
317 received_status_(false), 317 received_status_(false),
318 status_(0), 318 status_(0),
319 unreported_consumed_bytes_(0), 319 unreported_consumed_bytes_(0),
320 peer_(NULL) { 320 peer_(NULL) {
321 DCHECK(my_lifetime_flag_.get()); 321 DCHECK(my_lifetime_flag_.get());
322 my_lifetime_flag_->is_alive = true; 322 my_lifetime_flag_->is_alive = true;
323 } 323 }
324 324
325 ByteStreamReaderImpl::~ByteStreamReaderImpl() { 325 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
326 // No RunsTasksOnCurrentThread() check to allow deleting a created writer 326 // No RunsTasksInCurrentSequence() check to allow deleting a created writer
327 // before we start using it. Once started, should be deleted on the specified 327 // before we start using it. Once started, should be deleted on the specified
328 // task runner. 328 // task runner.
329 my_lifetime_flag_->is_alive = false; 329 my_lifetime_flag_->is_alive = false;
330 } 330 }
331 331
332 void ByteStreamReaderImpl::SetPeer( 332 void ByteStreamReaderImpl::SetPeer(
333 ByteStreamWriterImpl* peer, 333 ByteStreamWriterImpl* peer,
334 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 334 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
335 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 335 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
336 peer_ = peer; 336 peer_ = peer;
337 peer_task_runner_ = peer_task_runner; 337 peer_task_runner_ = peer_task_runner;
338 peer_lifetime_flag_ = peer_lifetime_flag; 338 peer_lifetime_flag_ = peer_lifetime_flag;
339 } 339 }
340 340
341 ByteStreamReaderImpl::StreamState 341 ByteStreamReaderImpl::StreamState
342 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, 342 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
343 size_t* length) { 343 size_t* length) {
344 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 344 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
345 345
346 if (available_contents_.size()) { 346 if (available_contents_.size()) {
347 *data = available_contents_.front().first; 347 *data = available_contents_.front().first;
348 *length = available_contents_.front().second; 348 *length = available_contents_.front().second;
349 available_contents_.pop_front(); 349 available_contents_.pop_front();
350 unreported_consumed_bytes_ += *length; 350 unreported_consumed_bytes_ += *length;
351 351
352 MaybeUpdateInput(); 352 MaybeUpdateInput();
353 return STREAM_HAS_DATA; 353 return STREAM_HAS_DATA;
354 } 354 }
355 if (received_status_) { 355 if (received_status_) {
356 return STREAM_COMPLETE; 356 return STREAM_COMPLETE;
357 } 357 }
358 return STREAM_EMPTY; 358 return STREAM_EMPTY;
359 } 359 }
360 360
361 int ByteStreamReaderImpl::GetStatus() const { 361 int ByteStreamReaderImpl::GetStatus() const {
362 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 362 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
363 DCHECK(received_status_); 363 DCHECK(received_status_);
364 return status_; 364 return status_;
365 } 365 }
366 366
367 void ByteStreamReaderImpl::RegisterCallback( 367 void ByteStreamReaderImpl::RegisterCallback(
368 const base::Closure& sink_callback) { 368 const base::Closure& sink_callback) {
369 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 369 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
370 370
371 data_available_callback_ = sink_callback; 371 data_available_callback_ = sink_callback;
372 } 372 }
373 373
374 // static 374 // static
375 void ByteStreamReaderImpl::TransferData( 375 void ByteStreamReaderImpl::TransferData(
376 scoped_refptr<LifetimeFlag> object_lifetime_flag, 376 scoped_refptr<LifetimeFlag> object_lifetime_flag,
377 ByteStreamReaderImpl* target, 377 ByteStreamReaderImpl* target,
378 std::unique_ptr<ContentVector> transfer_buffer, 378 std::unique_ptr<ContentVector> transfer_buffer,
379 size_t buffer_size, 379 size_t buffer_size,
380 bool source_complete, 380 bool source_complete,
381 int status) { 381 int status) {
382 // If our target is no longer alive, do nothing. 382 // If our target is no longer alive, do nothing.
383 if (!object_lifetime_flag->is_alive) return; 383 if (!object_lifetime_flag->is_alive) return;
384 384
385 target->TransferDataInternal(std::move(transfer_buffer), buffer_size, 385 target->TransferDataInternal(std::move(transfer_buffer), buffer_size,
386 source_complete, status); 386 source_complete, status);
387 } 387 }
388 388
389 void ByteStreamReaderImpl::TransferDataInternal( 389 void ByteStreamReaderImpl::TransferDataInternal(
390 std::unique_ptr<ContentVector> transfer_buffer, 390 std::unique_ptr<ContentVector> transfer_buffer,
391 size_t buffer_size, 391 size_t buffer_size,
392 bool source_complete, 392 bool source_complete,
393 int status) { 393 int status) {
394 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 394 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
395 395
396 bool was_empty = available_contents_.empty(); 396 bool was_empty = available_contents_.empty();
397 397
398 if (transfer_buffer) { 398 if (transfer_buffer) {
399 available_contents_.insert(available_contents_.end(), 399 available_contents_.insert(available_contents_.end(),
400 transfer_buffer->begin(), 400 transfer_buffer->begin(),
401 transfer_buffer->end()); 401 transfer_buffer->end());
402 } 402 }
403 403
404 if (source_complete) { 404 if (source_complete) {
405 received_status_ = true; 405 received_status_ = true;
406 status_ = status; 406 status_ = status;
407 } 407 }
408 408
409 // Callback on transition from empty to non-empty, or 409 // Callback on transition from empty to non-empty, or
410 // source complete. 410 // source complete.
411 if (((was_empty && !available_contents_.empty()) || 411 if (((was_empty && !available_contents_.empty()) ||
412 source_complete) && 412 source_complete) &&
413 !data_available_callback_.is_null()) 413 !data_available_callback_.is_null())
414 data_available_callback_.Run(); 414 data_available_callback_.Run();
415 } 415 }
416 416
417 // Decide whether or not to send the input a window update. 417 // Decide whether or not to send the input a window update.
418 // Currently we do that whenever we've got unreported consumption 418 // Currently we do that whenever we've got unreported consumption
419 // greater than 1/3 of total size. 419 // greater than 1/3 of total size.
420 void ByteStreamReaderImpl::MaybeUpdateInput() { 420 void ByteStreamReaderImpl::MaybeUpdateInput() {
421 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 421 DCHECK(my_task_runner_->RunsTasksInCurrentSequence());
422 422
423 if (unreported_consumed_bytes_ <= 423 if (unreported_consumed_bytes_ <=
424 total_buffer_size_ / kFractionReadBeforeWindowUpdate) 424 total_buffer_size_ / kFractionReadBeforeWindowUpdate)
425 return; 425 return;
426 426
427 peer_task_runner_->PostTask( 427 peer_task_runner_->PostTask(
428 FROM_HERE, base::Bind( 428 FROM_HERE, base::Bind(
429 &ByteStreamWriterImpl::UpdateWindow, 429 &ByteStreamWriterImpl::UpdateWindow,
430 peer_lifetime_flag_, 430 peer_lifetime_flag_,
431 peer_, 431 peer_,
(...skipping 24 matching lines...) Expand all
456 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( 456 ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
457 output_task_runner, output_flag, buffer_size); 457 output_task_runner, output_flag, buffer_size);
458 458
459 in->SetPeer(out, output_task_runner, output_flag); 459 in->SetPeer(out, output_task_runner, output_flag);
460 out->SetPeer(in, input_task_runner, input_flag); 460 out->SetPeer(in, input_task_runner, input_flag);
461 input->reset(in); 461 input->reset(in);
462 output->reset(out); 462 output->reset(out);
463 } 463 }
464 464
465 } // namespace content 465 } // namespace content
OLDNEW
« no previous file with comments | « content/browser/browser_thread_impl.cc ('k') | content/browser/dom_storage/dom_storage_task_runner.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698