Chromium Code Reviews| Index: content/browser/byte_stream.h |
| diff --git a/content/browser/byte_stream.h b/content/browser/byte_stream.h |
| index 49b76749051d180861a3de5deaf5417679ce1bcd..d358b0474d91c8b0c83a204f9c3416d6455d00b4 100644 |
| --- a/content/browser/byte_stream.h |
| +++ b/content/browser/byte_stream.h |
| @@ -9,10 +9,11 @@ |
| #include <set> |
| #include <utility> |
| +#include "base/bind.h" |
| #include "base/callback.h" |
| +#include "base/location.h" |
| #include "base/memory/ref_counted.h" |
| -#include "base/synchronization/lock.h" |
| -#include "content/public/browser/download_interrupt_reasons.h" |
| +#include "base/sequenced_task_runner.h" |
| #include "net/base/io_buffer.h" |
| namespace base { |
| @@ -34,7 +35,7 @@ namespace content { |
| // |
| // When the source has no more data to add, it will call |
| // |ByteStreamWriter::Close| to indicate that. Errors at the source |
| -// are indicated to the sink via a non-DOWNLOAD_INTERRUPT_REASON_NONE code. |
| +// are indicated to the sink via the GetStatus() method. |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Suggestion: Change "Errors" to "Operation status";
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
|
| // |
| // Normally the source is not managed after the relationship is setup; |
| // it is expected to provide data and then close itself. If an error |
| @@ -62,9 +63,9 @@ namespace content { |
| // |
| // void OriginatingClass::Initialize() { |
| // // Create a stream for sending bytes from IO->FILE threads. |
| -// scoped_ptr<ByteStreamWriter> writer; |
| -// scoped_ptr<ByteStreamReader> reader; |
| -// CreateByteStream( |
| +// scoped_ptr<ByteStreamWriter<status type> > writer; |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Could you change this to StatusType? My eye keeps
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
|
| +// scoped_ptr<ByteStreamReader<status type> > reader; |
| +// CreateByteStream<status type>( |
| // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), |
| // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), |
| // kStreamBufferSize /* e.g. 10240. */, |
| @@ -113,20 +114,21 @@ namespace content { |
| // } |
| // |
| // if (ByteStreamReader::STREAM_COMPLETE == state) { |
| -// DownloadInterruptReason status = reader->GetStatus(); |
| +// <status type> status = reader->GetStatus(); |
| // // Process error or successful completion in |status|. |
| // } |
| // |
| // // if |state| is STREAM_EMPTY, we're done for now; we'll be called |
| // // again when there's more data. |
| // } |
| -class CONTENT_EXPORT ByteStreamWriter { |
| -public: |
| +template <typename StatusType> |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
We need to document the requirements on StatusType
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Added around L38
|
| +class ByteStreamWriter { |
| + public: |
| // Inverse of the fraction of the stream buffer that must be full before |
| // a notification is sent to paired Reader that there's more data. |
| - static const int kFractionBufferBeforeSending; |
| + static const int kFractionBufferBeforeSending = 3; |
| - virtual ~ByteStreamWriter() = 0; |
| + virtual ~ByteStreamWriter() { }; |
| // Always adds the data passed into the ByteStream. Returns true |
| // if more data may be added without exceeding the class limit |
| @@ -135,9 +137,8 @@ public: |
| size_t byte_count) = 0; |
| // Signal that all data that is going to be sent, has been sent, |
| - // and provide a status. |DOWNLOAD_INTERRUPT_REASON_NONE| should be |
| - // passed for successful completion. |
| - virtual void Close(DownloadInterruptReason status) = 0; |
| + // and provide a status. |
| + virtual void Close(StatusType status) = 0; |
| // Register a callback to be called when the stream transitions from |
| // full to having space available. The callback will always be |
| @@ -152,15 +153,16 @@ public: |
| virtual void RegisterCallback(const base::Closure& source_callback) = 0; |
| }; |
| -class CONTENT_EXPORT ByteStreamReader { |
| +template <typename StatusType> |
| +class ByteStreamReader { |
| public: |
| // Inverse of the fraction of the stream buffer that must be empty before |
| // a notification is send to paired Writer that there's more room. |
| - static const int kFractionReadBeforeWindowUpdate; |
| + static const int kFractionReadBeforeWindowUpdate = 3; |
| enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; |
| - virtual ~ByteStreamReader() = 0; |
| + virtual ~ByteStreamReader() { }; |
| // Returns STREAM_EMPTY if there is no data on the ByteStream and |
| // Close() has not been called, and STREAM_COMPLETE if there |
| @@ -172,7 +174,7 @@ class CONTENT_EXPORT ByteStreamReader { |
| size_t* length) = 0; |
| // Only valid to call if Read() has returned STREAM_COMPLETE. |
| - virtual DownloadInterruptReason GetStatus() const = 0; |
| + virtual StatusType GetStatus() const = 0; |
| // Register a callback to be called when data is added or the source |
| // completes. The callback will be always be called on the owning |
| @@ -184,12 +186,391 @@ class CONTENT_EXPORT ByteStreamReader { |
| virtual void RegisterCallback(const base::Closure& sink_callback) = 0; |
| }; |
| -CONTENT_EXPORT void CreateByteStream( |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
I'm surprised that removing CONTENT_EXPORT works;
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
As we have implementation in .h file (to instantia
|
| +typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > |
| +ByteStreamContentVector; |
| + |
| +// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be |
| +// cleared in an object destructor and accessed to check for object |
| +// existence. We can't use weak pointers because they're tightly tied to |
| +// threads rather than task runners. |
| +// TODO(rdsmith): A better solution would be extending weak pointers |
| +// to support SequencedTaskRunners. |
| +struct ByteStreamLifetimeFlag |
| + : public base::RefCountedThreadSafe<ByteStreamLifetimeFlag> { |
| + public: |
| + ByteStreamLifetimeFlag() : is_alive(true) { } |
| + bool is_alive; |
| + |
| + protected: |
| + friend class base::RefCountedThreadSafe<ByteStreamLifetimeFlag>; |
| + virtual ~ByteStreamLifetimeFlag() { } |
| + |
| + private: |
| + DISALLOW_COPY_AND_ASSIGN(ByteStreamLifetimeFlag); |
| +}; |
| + |
| +template <typename StatusType> |
| +class ByteStreamReaderImpl; |
| + |
| +// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and |
| +// SetPeer may happen anywhere; all other operations on each class must |
| +// happen in the context of their SequencedTaskRunner. |
| +template <typename StatusType> |
| +class ByteStreamWriterImpl : public ByteStreamWriter<StatusType> { |
| + public: |
| + ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
| + scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
| + size_t buffer_size) |
| + : total_buffer_size_(buffer_size), |
| + my_task_runner_(task_runner), |
| + my_lifetime_flag_(lifetime_flag), |
| + input_contents_size_(0), |
| + output_size_used_(0), |
| + peer_(NULL) { |
| + DCHECK(my_lifetime_flag_.get()); |
| + my_lifetime_flag_->is_alive = true; |
| + } |
| + |
| + virtual ~ByteStreamWriterImpl() { |
| + my_lifetime_flag_->is_alive = false; |
| + } |
| + |
| + // Must be called before any operations are performed. |
| + void SetPeer(ByteStreamReaderImpl<StatusType>* peer, |
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { |
| + peer_ = peer; |
| + peer_task_runner_ = peer_task_runner; |
| + peer_lifetime_flag_ = peer_lifetime_flag; |
| + } |
| + |
| + // Overridden from ByteStreamWriter. |
| + virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
| + size_t byte_count) OVERRIDE; |
| + virtual void Close(StatusType status) OVERRIDE; |
| + virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + space_available_callback_ = source_callback; |
| + } |
| + |
| + // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. |
| + static void UpdateWindow(scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
| + ByteStreamWriterImpl<StatusType>* target, |
| + size_t bytes_consumed) { |
| + // If the target object isn't alive anymore, we do nothing. |
| + if (!lifetime_flag->is_alive) return; |
| + |
| + target->UpdateWindowInternal(bytes_consumed); |
| + } |
| + |
| + private: |
| + // Called from UpdateWindow when object existence has been validated. |
| + void UpdateWindowInternal(size_t bytes_consumed) { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + DCHECK_GE(output_size_used_, bytes_consumed); |
| + output_size_used_ -= bytes_consumed; |
| + |
| + // Callback if we were above the limit and we're now <= to it. |
| + size_t total_known_size_used = |
| + input_contents_size_ + output_size_used_; |
| + |
| + if (total_known_size_used <= total_buffer_size_ && |
| + (total_known_size_used + bytes_consumed > total_buffer_size_) && |
| + !space_available_callback_.is_null()) |
| + space_available_callback_.Run(); |
| + } |
| + |
| + void DrainInputBuffer(scoped_ptr<ByteStreamContentVector>* buffer) { |
| + if (0 == input_contents_size_) |
| + return; |
| + |
| + buffer->reset(new ByteStreamContentVector); |
| + (*buffer)->swap(input_contents_); |
| + output_size_used_ += input_contents_size_; |
| + input_contents_size_ = 0; |
| + } |
| + |
| + const size_t total_buffer_size_; |
| + |
| + // All data objects in this class are only valid to access on |
| + // this task runner except as otherwise noted. |
| + scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| + |
| + // True while this object is alive. |
| + scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; |
| + |
| + base::Closure space_available_callback_; |
| + ByteStreamContentVector input_contents_; |
| + size_t input_contents_size_; |
| + |
| + // ** Peer information. |
| + |
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
| + |
| + // How much we've sent to the output that for flow control purposes we |
| + // must assume hasn't been read yet. |
| + size_t output_size_used_; |
| + |
| + // Only valid to access on peer_task_runner_. |
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; |
| + |
| + // Only valid to access on peer_task_runner_ if |
| + // |*peer_lifetime_flag_ == true| |
| + ByteStreamReaderImpl<StatusType>* peer_; |
| +}; |
| + |
| +template <typename StatusType> |
| +class ByteStreamReaderImpl : public ByteStreamReader<StatusType> { |
| + public: |
| + ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
| + scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
| + size_t buffer_size) |
| + : total_buffer_size_(buffer_size), |
| + my_task_runner_(task_runner), |
| + my_lifetime_flag_(lifetime_flag), |
| + received_status_(false), |
| + unreported_consumed_bytes_(0), |
| + peer_(NULL) { |
| + DCHECK(my_lifetime_flag_.get()); |
| + my_lifetime_flag_->is_alive = true; |
| + } |
| + |
| + virtual ~ByteStreamReaderImpl() { |
| + my_lifetime_flag_->is_alive = false; |
| + } |
| + |
| + // Must be called before any operations are performed. |
| + void SetPeer(ByteStreamWriterImpl<StatusType>* peer, |
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { |
| + peer_ = peer; |
| + peer_task_runner_ = peer_task_runner; |
| + peer_lifetime_flag_ = peer_lifetime_flag; |
| + } |
| + |
| + // Overridden from ByteStreamReader. |
| + virtual typename ByteStreamReader<StatusType>::StreamState Read( |
| + scoped_refptr<net::IOBuffer>* data, |
| + size_t* length) OVERRIDE { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + |
| + if (available_contents_.size()) { |
| + *data = available_contents_.front().first; |
| + *length = available_contents_.front().second; |
| + available_contents_.pop_front(); |
| + unreported_consumed_bytes_ += *length; |
| + |
| + MaybeUpdateInput(); |
| + return ByteStreamReader<StatusType>::STREAM_HAS_DATA; |
| + } |
| + if (received_status_) { |
| + return ByteStreamReader<StatusType>::STREAM_COMPLETE; |
| + } |
| + return ByteStreamReader<StatusType>::STREAM_EMPTY; |
| + } |
| + virtual StatusType GetStatus() const OVERRIDE { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + DCHECK(received_status_); |
| + return status_; |
| + } |
| + virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + |
| + data_available_callback_ = sink_callback; |
| + } |
| + |
| + // PostTask targets from |ByteStreamWriterImpl::Write| and |
| + // |ByteStreamWriterImpl::Close|. |
| + // Receive data from our peer. |
| + // static because it may be called after the object it is targeting |
| + // has been destroyed. It may not access |*target| |
| + // if |*object_lifetime_flag| is false. |
| + static void TransferData( |
| + scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, |
| + ByteStreamReaderImpl<StatusType>* target, |
| + scoped_ptr<ByteStreamContentVector> transfer_buffer) { |
| + // If our target is no longer alive, do nothing. |
| + if (!object_lifetime_flag->is_alive) return; |
| + |
| + target->TransferDataInternal(transfer_buffer.Pass()); |
| + } |
| + static void TransferDataAndClose( |
| + scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, |
| + ByteStreamReaderImpl<StatusType>* target, |
| + scoped_ptr<ByteStreamContentVector> transfer_buffer, |
| + StatusType status) { |
| + // If our target is no longer alive, do nothing. |
| + if (!object_lifetime_flag->is_alive) return; |
| + |
| + target->TransferDataAndCloseInternal(transfer_buffer.Pass(), status); |
| + } |
| + |
| + private: |
| + void AppendBuffer(scoped_ptr<ByteStreamContentVector> buffer) { |
| + if (!buffer) |
| + return; |
| + |
| + available_contents_.insert(available_contents_.end(), |
| + buffer->begin(), |
| + buffer->end()); |
| + } |
| + |
| + void MaybeInvokeCallback(bool was_empty, bool source_complete) { |
| + // Callback on transition from empty to non-empty, or |
| + // source complete. |
| + if (((was_empty && !available_contents_.empty()) || |
| + source_complete) && |
| + !data_available_callback_.is_null()) |
| + data_available_callback_.Run(); |
| + } |
| + |
| + // Called from TransferData.* once object existence has been validated. |
| + void TransferDataInternal( |
| + scoped_ptr<ByteStreamContentVector> transfer_buffer) { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + |
| + bool was_empty = available_contents_.empty(); |
| + |
| + AppendBuffer(transfer_buffer.Pass()); |
| + |
| + MaybeInvokeCallback(was_empty, false /* source_complete */); |
| + } |
| + void TransferDataAndCloseInternal( |
| + scoped_ptr<ByteStreamContentVector> transfer_buffer, StatusType status) { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + |
| + bool was_empty = available_contents_.empty(); |
| + |
| + AppendBuffer(transfer_buffer.Pass()); |
| + |
| + received_status_ = true; |
| + status_ = status; |
| + |
| + MaybeInvokeCallback(was_empty, true /* source_complete */); |
| + } |
| + |
| + void MaybeUpdateInput(); |
| + |
| + const size_t total_buffer_size_; |
| + |
| + scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| + |
| + // True while this object is alive. |
| + scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; |
| + |
| + ByteStreamContentVector available_contents_; |
| + |
| + bool received_status_; |
| + StatusType status_; |
| + |
| + base::Closure data_available_callback_; |
| + |
| + // Time of last point at which data in stream transitioned from full |
| + // to non-full. Nulled when a callback is sent. |
| + base::Time last_non_full_time_; |
| + |
| + // ** Peer information |
| + |
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
| + |
| + // How much has been removed from this class that we haven't told |
| + // the input about yet. |
| + size_t unreported_consumed_bytes_; |
| + |
| + // Only valid to access on peer_task_runner_. |
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; |
| + |
| + // Only valid to access on peer_task_runner_ if |
| + // |*peer_lifetime_flag_ == true| |
| + ByteStreamWriterImpl<StatusType>* peer_; |
| +}; |
| + |
| +template <typename StatusType> |
| +bool ByteStreamWriterImpl<StatusType>::Write( |
| + scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + |
| + input_contents_.push_back(std::make_pair(buffer, byte_count)); |
| + input_contents_size_ += byte_count; |
| + |
| + // Arbitrarily, we buffer to a third of the total size before sending. |
| + if (input_contents_size_ > total_buffer_size_ / |
| + ByteStreamWriter<StatusType>::kFractionBufferBeforeSending) { |
| + scoped_ptr<ByteStreamContentVector> transfer_buffer( |
| + new ByteStreamContentVector); |
| + DrainInputBuffer(&transfer_buffer); |
| + |
| + peer_task_runner_->PostTask( |
| + FROM_HERE, base::Bind( |
| + &ByteStreamReaderImpl<StatusType>::TransferData, |
| + peer_lifetime_flag_, |
| + peer_, |
| + base::Passed(&transfer_buffer))); |
| + } |
| + |
| + return (input_contents_size_ + output_size_used_ <= total_buffer_size_); |
| +} |
| + |
| +template <typename StatusType> |
| +void ByteStreamWriterImpl<StatusType>::Close(StatusType status) { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + |
| + scoped_ptr<ByteStreamContentVector> transfer_buffer( |
| + new ByteStreamContentVector); |
| + DrainInputBuffer(&transfer_buffer); |
| + |
| + peer_task_runner_->PostTask( |
| + FROM_HERE, base::Bind( |
| + &ByteStreamReaderImpl<StatusType>::TransferDataAndClose, |
| + peer_lifetime_flag_, |
| + peer_, |
| + base::Passed(&transfer_buffer), |
| + status)); |
| +} |
| + |
| +// Decide whether or not to send the input a window update. |
| +// Currently we do that whenever we've got unreported consumption |
| +// greater than 1/3 of total size. |
| +template <typename StatusType> |
| +void ByteStreamReaderImpl<StatusType>::MaybeUpdateInput() { |
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| + |
| + if (unreported_consumed_bytes_ <= |
| + total_buffer_size_ / |
| + ByteStreamReader<StatusType>::kFractionReadBeforeWindowUpdate) |
| + return; |
| + |
| + peer_task_runner_->PostTask( |
| + FROM_HERE, base::Bind( |
| + &ByteStreamWriterImpl<StatusType>::UpdateWindow, |
| + peer_lifetime_flag_, |
| + peer_, |
| + unreported_consumed_bytes_)); |
| + unreported_consumed_bytes_ = 0; |
| +} |
| + |
| +template <typename StatusType> |
| +void CreateByteStream( |
| scoped_refptr<base::SequencedTaskRunner> input_task_runner, |
| scoped_refptr<base::SequencedTaskRunner> output_task_runner, |
| size_t buffer_size, |
| - scoped_ptr<ByteStreamWriter>* input, |
| - scoped_ptr<ByteStreamReader>* output); |
| + scoped_ptr<ByteStreamWriter<StatusType> >* input, |
| + scoped_ptr<ByteStreamReader<StatusType> >* output) { |
| + scoped_refptr<ByteStreamLifetimeFlag> input_flag( |
| + new ByteStreamLifetimeFlag()); |
| + scoped_refptr<ByteStreamLifetimeFlag> output_flag( |
| + new ByteStreamLifetimeFlag()); |
| + |
| + ByteStreamWriterImpl<StatusType>* in = new ByteStreamWriterImpl<StatusType>( |
| + input_task_runner, input_flag, buffer_size); |
| + ByteStreamReaderImpl<StatusType>* out = new ByteStreamReaderImpl<StatusType>( |
| + output_task_runner, output_flag, buffer_size); |
| + |
| + in->SetPeer(out, output_task_runner, output_flag); |
| + out->SetPeer(in, input_task_runner, input_flag); |
| + input->reset(in); |
| + output->reset(out); |
| +} |
| } // namespace content |