| OLD | NEW | 
|    1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |    1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 
|    2 // Use of this source code is governed by a BSD-style license that can be |    2 // Use of this source code is governed by a BSD-style license that can be | 
|    3 // found in the LICENSE file. |    3 // found in the LICENSE file. | 
|    4  |    4  | 
|    5 #include "content/browser/byte_stream.h" |    5 #include "content/browser/byte_stream.h" | 
|    6  |    6  | 
|    7 #include <deque> |    7 #include <deque> | 
|    8 #include <set> |    8 #include <set> | 
|    9 #include <utility> |    9 #include <utility> | 
|   10  |   10  | 
| (...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|  184       my_task_runner_(task_runner), |  184       my_task_runner_(task_runner), | 
|  185       my_lifetime_flag_(lifetime_flag), |  185       my_lifetime_flag_(lifetime_flag), | 
|  186       input_contents_size_(0), |  186       input_contents_size_(0), | 
|  187       output_size_used_(0), |  187       output_size_used_(0), | 
|  188       peer_(NULL) { |  188       peer_(NULL) { | 
|  189   DCHECK(my_lifetime_flag_.get()); |  189   DCHECK(my_lifetime_flag_.get()); | 
|  190   my_lifetime_flag_->is_alive = true; |  190   my_lifetime_flag_->is_alive = true; | 
|  191 } |  191 } | 
|  192  |  192  | 
|  193 ByteStreamWriterImpl::~ByteStreamWriterImpl() { |  193 ByteStreamWriterImpl::~ByteStreamWriterImpl() { | 
|  194   // No RunsTasksOnCurrentThread() check to allow deleting a created writer |  194   // No RunsTasksInCurrentSequence() check to allow deleting a created writer | 
|  195   // before we start using it. Once started, should be deleted on the specified |  195   // before we start using it. Once started, should be deleted on the specified | 
|  196   // task runner. |  196   // task runner. | 
|  197   my_lifetime_flag_->is_alive = false; |  197   my_lifetime_flag_->is_alive = false; | 
|  198 } |  198 } | 
|  199  |  199  | 
|  200 void ByteStreamWriterImpl::SetPeer( |  200 void ByteStreamWriterImpl::SetPeer( | 
|  201     ByteStreamReaderImpl* peer, |  201     ByteStreamReaderImpl* peer, | 
|  202     scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |  202     scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 
|  203     scoped_refptr<LifetimeFlag> peer_lifetime_flag) { |  203     scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | 
|  204   peer_ = peer; |  204   peer_ = peer; | 
|  205   peer_task_runner_ = peer_task_runner; |  205   peer_task_runner_ = peer_task_runner; | 
|  206   peer_lifetime_flag_ = peer_lifetime_flag; |  206   peer_lifetime_flag_ = peer_lifetime_flag; | 
|  207 } |  207 } | 
|  208  |  208  | 
|  209 bool ByteStreamWriterImpl::Write( |  209 bool ByteStreamWriterImpl::Write( | 
|  210     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |  210     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | 
|  211   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  211   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  212  |  212  | 
|  213   // Check overflow. |  213   // Check overflow. | 
|  214   // |  214   // | 
|  215   // TODO(tyoshino): Discuss with content/browser/download developer and if |  215   // TODO(tyoshino): Discuss with content/browser/download developer and if | 
|  216   // they're fine with, set smaller limit and make it configurable. |  216   // they're fine with, set smaller limit and make it configurable. | 
|  217   size_t space_limit = std::numeric_limits<size_t>::max() - |  217   size_t space_limit = std::numeric_limits<size_t>::max() - | 
|  218       GetTotalBufferedBytes(); |  218       GetTotalBufferedBytes(); | 
|  219   if (byte_count > space_limit) { |  219   if (byte_count > space_limit) { | 
|  220     // TODO(tyoshino): Tell the user that Write() failed. |  220     // TODO(tyoshino): Tell the user that Write() failed. | 
|  221     // Ignore input. |  221     // Ignore input. | 
|  222     return false; |  222     return false; | 
|  223   } |  223   } | 
|  224  |  224  | 
|  225   input_contents_.push_back(std::make_pair(buffer, byte_count)); |  225   input_contents_.push_back(std::make_pair(buffer, byte_count)); | 
|  226   input_contents_size_ += byte_count; |  226   input_contents_size_ += byte_count; | 
|  227  |  227  | 
|  228   // Arbitrarily, we buffer to a third of the total size before sending. |  228   // Arbitrarily, we buffer to a third of the total size before sending. | 
|  229   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) |  229   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | 
|  230     PostToPeer(false, 0); |  230     PostToPeer(false, 0); | 
|  231  |  231  | 
|  232   return GetTotalBufferedBytes() <= total_buffer_size_; |  232   return GetTotalBufferedBytes() <= total_buffer_size_; | 
|  233 } |  233 } | 
|  234  |  234  | 
|  235 void ByteStreamWriterImpl::Flush() { |  235 void ByteStreamWriterImpl::Flush() { | 
|  236   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  236   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  237   if (input_contents_size_ > 0) |  237   if (input_contents_size_ > 0) | 
|  238     PostToPeer(false, 0); |  238     PostToPeer(false, 0); | 
|  239 } |  239 } | 
|  240  |  240  | 
|  241 void ByteStreamWriterImpl::Close(int status) { |  241 void ByteStreamWriterImpl::Close(int status) { | 
|  242   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  242   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  243   PostToPeer(true, status); |  243   PostToPeer(true, status); | 
|  244 } |  244 } | 
|  245  |  245  | 
|  246 void ByteStreamWriterImpl::RegisterCallback( |  246 void ByteStreamWriterImpl::RegisterCallback( | 
|  247     const base::Closure& source_callback) { |  247     const base::Closure& source_callback) { | 
|  248   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  248   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  249   space_available_callback_ = source_callback; |  249   space_available_callback_ = source_callback; | 
|  250 } |  250 } | 
|  251  |  251  | 
|  252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const { |  252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const { | 
|  253   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  253   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  254   // This sum doesn't overflow since Write() fails if this sum is going to |  254   // This sum doesn't overflow since Write() fails if this sum is going to | 
|  255   // overflow. |  255   // overflow. | 
|  256   return input_contents_size_ + output_size_used_; |  256   return input_contents_size_ + output_size_used_; | 
|  257 } |  257 } | 
|  258  |  258  | 
|  259 // static |  259 // static | 
|  260 void ByteStreamWriterImpl::UpdateWindow( |  260 void ByteStreamWriterImpl::UpdateWindow( | 
|  261     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, |  261     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, | 
|  262     size_t bytes_consumed) { |  262     size_t bytes_consumed) { | 
|  263   // If the target object isn't alive anymore, we do nothing. |  263   // If the target object isn't alive anymore, we do nothing. | 
|  264   if (!lifetime_flag->is_alive) return; |  264   if (!lifetime_flag->is_alive) return; | 
|  265  |  265  | 
|  266   target->UpdateWindowInternal(bytes_consumed); |  266   target->UpdateWindowInternal(bytes_consumed); | 
|  267 } |  267 } | 
|  268  |  268  | 
|  269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { |  269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { | 
|  270   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  270   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  271  |  271  | 
|  272   bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_; |  272   bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_; | 
|  273  |  273  | 
|  274   DCHECK_GE(output_size_used_, bytes_consumed); |  274   DCHECK_GE(output_size_used_, bytes_consumed); | 
|  275   output_size_used_ -= bytes_consumed; |  275   output_size_used_ -= bytes_consumed; | 
|  276  |  276  | 
|  277   // Callback if we were above the limit and we're now <= to it. |  277   // Callback if we were above the limit and we're now <= to it. | 
|  278   bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_; |  278   bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_; | 
|  279  |  279  | 
|  280   if (no_longer_above_limit && was_above_limit && |  280   if (no_longer_above_limit && was_above_limit && | 
|  281       !space_available_callback_.is_null()) |  281       !space_available_callback_.is_null()) | 
|  282     space_available_callback_.Run(); |  282     space_available_callback_.Run(); | 
|  283 } |  283 } | 
|  284  |  284  | 
|  285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { |  285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { | 
|  286   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  286   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  287   // Valid contexts in which to call. |  287   // Valid contexts in which to call. | 
|  288   DCHECK(complete || 0 != input_contents_size_); |  288   DCHECK(complete || 0 != input_contents_size_); | 
|  289  |  289  | 
|  290   std::unique_ptr<ContentVector> transfer_buffer; |  290   std::unique_ptr<ContentVector> transfer_buffer; | 
|  291   size_t buffer_size = 0; |  291   size_t buffer_size = 0; | 
|  292   if (0 != input_contents_size_) { |  292   if (0 != input_contents_size_) { | 
|  293     transfer_buffer.reset(new ContentVector); |  293     transfer_buffer.reset(new ContentVector); | 
|  294     transfer_buffer->swap(input_contents_); |  294     transfer_buffer->swap(input_contents_); | 
|  295     buffer_size = input_contents_size_; |  295     buffer_size = input_contents_size_; | 
|  296     output_size_used_ += input_contents_size_; |  296     output_size_used_ += input_contents_size_; | 
| (...skipping 19 matching lines...) Expand all  Loading... | 
|  316       my_lifetime_flag_(lifetime_flag), |  316       my_lifetime_flag_(lifetime_flag), | 
|  317       received_status_(false), |  317       received_status_(false), | 
|  318       status_(0), |  318       status_(0), | 
|  319       unreported_consumed_bytes_(0), |  319       unreported_consumed_bytes_(0), | 
|  320       peer_(NULL) { |  320       peer_(NULL) { | 
|  321   DCHECK(my_lifetime_flag_.get()); |  321   DCHECK(my_lifetime_flag_.get()); | 
|  322   my_lifetime_flag_->is_alive = true; |  322   my_lifetime_flag_->is_alive = true; | 
|  323 } |  323 } | 
|  324  |  324  | 
|  325 ByteStreamReaderImpl::~ByteStreamReaderImpl() { |  325 ByteStreamReaderImpl::~ByteStreamReaderImpl() { | 
|  326   // No RunsTasksOnCurrentThread() check to allow deleting a created writer |  326   // No RunsTasksInCurrentSequence() check to allow deleting a created writer | 
|  327   // before we start using it. Once started, should be deleted on the specified |  327   // before we start using it. Once started, should be deleted on the specified | 
|  328   // task runner. |  328   // task runner. | 
|  329   my_lifetime_flag_->is_alive = false; |  329   my_lifetime_flag_->is_alive = false; | 
|  330 } |  330 } | 
|  331  |  331  | 
|  332 void ByteStreamReaderImpl::SetPeer( |  332 void ByteStreamReaderImpl::SetPeer( | 
|  333     ByteStreamWriterImpl* peer, |  333     ByteStreamWriterImpl* peer, | 
|  334     scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |  334     scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 
|  335     scoped_refptr<LifetimeFlag> peer_lifetime_flag) { |  335     scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | 
|  336   peer_ = peer; |  336   peer_ = peer; | 
|  337   peer_task_runner_ = peer_task_runner; |  337   peer_task_runner_ = peer_task_runner; | 
|  338   peer_lifetime_flag_ = peer_lifetime_flag; |  338   peer_lifetime_flag_ = peer_lifetime_flag; | 
|  339 } |  339 } | 
|  340  |  340  | 
|  341 ByteStreamReaderImpl::StreamState |  341 ByteStreamReaderImpl::StreamState | 
|  342 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, |  342 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, | 
|  343                            size_t* length) { |  343                            size_t* length) { | 
|  344   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  344   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  345  |  345  | 
|  346   if (available_contents_.size()) { |  346   if (available_contents_.size()) { | 
|  347     *data = available_contents_.front().first; |  347     *data = available_contents_.front().first; | 
|  348     *length = available_contents_.front().second; |  348     *length = available_contents_.front().second; | 
|  349     available_contents_.pop_front(); |  349     available_contents_.pop_front(); | 
|  350     unreported_consumed_bytes_ += *length; |  350     unreported_consumed_bytes_ += *length; | 
|  351  |  351  | 
|  352     MaybeUpdateInput(); |  352     MaybeUpdateInput(); | 
|  353     return STREAM_HAS_DATA; |  353     return STREAM_HAS_DATA; | 
|  354   } |  354   } | 
|  355   if (received_status_) { |  355   if (received_status_) { | 
|  356     return STREAM_COMPLETE; |  356     return STREAM_COMPLETE; | 
|  357   } |  357   } | 
|  358   return STREAM_EMPTY; |  358   return STREAM_EMPTY; | 
|  359 } |  359 } | 
|  360  |  360  | 
|  361 int ByteStreamReaderImpl::GetStatus() const { |  361 int ByteStreamReaderImpl::GetStatus() const { | 
|  362   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  362   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  363   DCHECK(received_status_); |  363   DCHECK(received_status_); | 
|  364   return status_; |  364   return status_; | 
|  365 } |  365 } | 
|  366  |  366  | 
|  367 void ByteStreamReaderImpl::RegisterCallback( |  367 void ByteStreamReaderImpl::RegisterCallback( | 
|  368     const base::Closure& sink_callback) { |  368     const base::Closure& sink_callback) { | 
|  369   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  369   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  370  |  370  | 
|  371   data_available_callback_ = sink_callback; |  371   data_available_callback_ = sink_callback; | 
|  372 } |  372 } | 
|  373  |  373  | 
|  374 // static |  374 // static | 
|  375 void ByteStreamReaderImpl::TransferData( |  375 void ByteStreamReaderImpl::TransferData( | 
|  376     scoped_refptr<LifetimeFlag> object_lifetime_flag, |  376     scoped_refptr<LifetimeFlag> object_lifetime_flag, | 
|  377     ByteStreamReaderImpl* target, |  377     ByteStreamReaderImpl* target, | 
|  378     std::unique_ptr<ContentVector> transfer_buffer, |  378     std::unique_ptr<ContentVector> transfer_buffer, | 
|  379     size_t buffer_size, |  379     size_t buffer_size, | 
|  380     bool source_complete, |  380     bool source_complete, | 
|  381     int status) { |  381     int status) { | 
|  382   // If our target is no longer alive, do nothing. |  382   // If our target is no longer alive, do nothing. | 
|  383   if (!object_lifetime_flag->is_alive) return; |  383   if (!object_lifetime_flag->is_alive) return; | 
|  384  |  384  | 
|  385   target->TransferDataInternal(std::move(transfer_buffer), buffer_size, |  385   target->TransferDataInternal(std::move(transfer_buffer), buffer_size, | 
|  386                                source_complete, status); |  386                                source_complete, status); | 
|  387 } |  387 } | 
|  388  |  388  | 
|  389 void ByteStreamReaderImpl::TransferDataInternal( |  389 void ByteStreamReaderImpl::TransferDataInternal( | 
|  390     std::unique_ptr<ContentVector> transfer_buffer, |  390     std::unique_ptr<ContentVector> transfer_buffer, | 
|  391     size_t buffer_size, |  391     size_t buffer_size, | 
|  392     bool source_complete, |  392     bool source_complete, | 
|  393     int status) { |  393     int status) { | 
|  394   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  394   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  395  |  395  | 
|  396   bool was_empty = available_contents_.empty(); |  396   bool was_empty = available_contents_.empty(); | 
|  397  |  397  | 
|  398   if (transfer_buffer) { |  398   if (transfer_buffer) { | 
|  399     available_contents_.insert(available_contents_.end(), |  399     available_contents_.insert(available_contents_.end(), | 
|  400                                transfer_buffer->begin(), |  400                                transfer_buffer->begin(), | 
|  401                                transfer_buffer->end()); |  401                                transfer_buffer->end()); | 
|  402   } |  402   } | 
|  403  |  403  | 
|  404   if (source_complete) { |  404   if (source_complete) { | 
|  405     received_status_ = true; |  405     received_status_ = true; | 
|  406     status_ = status; |  406     status_ = status; | 
|  407   } |  407   } | 
|  408  |  408  | 
|  409   // Callback on transition from empty to non-empty, or |  409   // Callback on transition from empty to non-empty, or | 
|  410   // source complete. |  410   // source complete. | 
|  411   if (((was_empty && !available_contents_.empty()) || |  411   if (((was_empty && !available_contents_.empty()) || | 
|  412        source_complete) && |  412        source_complete) && | 
|  413       !data_available_callback_.is_null()) |  413       !data_available_callback_.is_null()) | 
|  414     data_available_callback_.Run(); |  414     data_available_callback_.Run(); | 
|  415 } |  415 } | 
|  416  |  416  | 
|  417 // Decide whether or not to send the input a window update. |  417 // Decide whether or not to send the input a window update. | 
|  418 // Currently we do that whenever we've got unreported consumption |  418 // Currently we do that whenever we've got unreported consumption | 
|  419 // greater than 1/3 of total size. |  419 // greater than 1/3 of total size. | 
|  420 void ByteStreamReaderImpl::MaybeUpdateInput() { |  420 void ByteStreamReaderImpl::MaybeUpdateInput() { | 
|  421   DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |  421   DCHECK(my_task_runner_->RunsTasksInCurrentSequence()); | 
|  422  |  422  | 
|  423   if (unreported_consumed_bytes_ <= |  423   if (unreported_consumed_bytes_ <= | 
|  424       total_buffer_size_ / kFractionReadBeforeWindowUpdate) |  424       total_buffer_size_ / kFractionReadBeforeWindowUpdate) | 
|  425     return; |  425     return; | 
|  426  |  426  | 
|  427   peer_task_runner_->PostTask( |  427   peer_task_runner_->PostTask( | 
|  428       FROM_HERE, base::Bind( |  428       FROM_HERE, base::Bind( | 
|  429           &ByteStreamWriterImpl::UpdateWindow, |  429           &ByteStreamWriterImpl::UpdateWindow, | 
|  430           peer_lifetime_flag_, |  430           peer_lifetime_flag_, | 
|  431           peer_, |  431           peer_, | 
| (...skipping 24 matching lines...) Expand all  Loading... | 
|  456   ByteStreamReaderImpl* out = new ByteStreamReaderImpl( |  456   ByteStreamReaderImpl* out = new ByteStreamReaderImpl( | 
|  457       output_task_runner, output_flag, buffer_size); |  457       output_task_runner, output_flag, buffer_size); | 
|  458  |  458  | 
|  459   in->SetPeer(out, output_task_runner, output_flag); |  459   in->SetPeer(out, output_task_runner, output_flag); | 
|  460   out->SetPeer(in, input_task_runner, input_flag); |  460   out->SetPeer(in, input_task_runner, input_flag); | 
|  461   input->reset(in); |  461   input->reset(in); | 
|  462   output->reset(out); |  462   output->reset(out); | 
|  463 } |  463 } | 
|  464  |  464  | 
|  465 }  // namespace content |  465 }  // namespace content | 
| OLD | NEW |