| Index: content/browser/byte_stream.h
|
| diff --git a/content/browser/byte_stream.h b/content/browser/byte_stream.h
|
| index 49b76749051d180861a3de5deaf5417679ce1bcd..4c7f0b2759368050c137c3387cf75c7b6a015371 100644
|
| --- a/content/browser/byte_stream.h
|
| +++ b/content/browser/byte_stream.h
|
| @@ -9,10 +9,11 @@
|
| #include <set>
|
| #include <utility>
|
|
|
| +#include "base/bind.h"
|
| #include "base/callback.h"
|
| +#include "base/location.h"
|
| #include "base/memory/ref_counted.h"
|
| -#include "base/synchronization/lock.h"
|
| -#include "content/public/browser/download_interrupt_reasons.h"
|
| +#include "base/sequenced_task_runner.h"
|
| #include "net/base/io_buffer.h"
|
|
|
| namespace base {
|
| @@ -33,8 +34,12 @@ namespace content {
|
| // and the sink retrieves bytes already written via |ByteStreamReader::Read|.
|
| //
|
| // When the source has no more data to add, it will call
|
| -// |ByteStreamWriter::Close| to indicate that. Errors at the source
|
| -// are indicated to the sink via a non-DOWNLOAD_INTERRUPT_REASON_NONE code.
|
| +// |ByteStreamWriter::Close| to indicate that. Operation status at the source
|
| +// are indicated to the sink via the GetStatus() method.
|
| +//
|
| +// The type of operation status is specified as the StatusType template
|
| +// parameter of these classes. StatusType must have a 0-arg constructor and be
|
| +// copyable and assignable.
|
| //
|
| // Normally the source is not managed after the relationship is setup;
|
| // it is expected to provide data and then close itself. If an error
|
| @@ -62,9 +67,9 @@ namespace content {
|
| //
|
| // void OriginatingClass::Initialize() {
|
| // // Create a stream for sending bytes from IO->FILE threads.
|
| -// scoped_ptr<ByteStreamWriter> writer;
|
| -// scoped_ptr<ByteStreamReader> reader;
|
| -// CreateByteStream(
|
| +// scoped_ptr<ByteStreamWriter<StatusType> > writer;
|
| +// scoped_ptr<ByteStreamReader<StatusType> > reader;
|
| +// CreateByteStream<StatusType>(
|
| // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO),
|
| // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE),
|
| // kStreamBufferSize /* e.g. 10240. */,
|
| @@ -113,20 +118,21 @@ namespace content {
|
| // }
|
| //
|
| // if (ByteStreamReader::STREAM_COMPLETE == state) {
|
| -// DownloadInterruptReason status = reader->GetStatus();
|
| +// <StatusType> status = reader->GetStatus();
|
| // // Process error or successful completion in |status|.
|
| // }
|
| //
|
| // // if |state| is STREAM_EMPTY, we're done for now; we'll be called
|
| // // again when there's more data.
|
| // }
|
| -class CONTENT_EXPORT ByteStreamWriter {
|
| -public:
|
| +template <typename StatusType>
|
| +class ByteStreamWriter {
|
| + public:
|
| // Inverse of the fraction of the stream buffer that must be full before
|
| // a notification is sent to paired Reader that there's more data.
|
| - static const int kFractionBufferBeforeSending;
|
| + static const int kFractionBufferBeforeSending = 3;
|
|
|
| - virtual ~ByteStreamWriter() = 0;
|
| + virtual ~ByteStreamWriter() { };
|
|
|
| // Always adds the data passed into the ByteStream. Returns true
|
| // if more data may be added without exceeding the class limit
|
| @@ -135,9 +141,8 @@ public:
|
| size_t byte_count) = 0;
|
|
|
| // Signal that all data that is going to be sent, has been sent,
|
| - // and provide a status. |DOWNLOAD_INTERRUPT_REASON_NONE| should be
|
| - // passed for successful completion.
|
| - virtual void Close(DownloadInterruptReason status) = 0;
|
| + // and provide a status.
|
| + virtual void Close(StatusType status) = 0;
|
|
|
| // Register a callback to be called when the stream transitions from
|
| // full to having space available. The callback will always be
|
| @@ -152,15 +157,16 @@ public:
|
| virtual void RegisterCallback(const base::Closure& source_callback) = 0;
|
| };
|
|
|
| -class CONTENT_EXPORT ByteStreamReader {
|
| +template <typename StatusType>
|
| +class ByteStreamReader {
|
| public:
|
| // Inverse of the fraction of the stream buffer that must be empty before
|
| // a notification is send to paired Writer that there's more room.
|
| - static const int kFractionReadBeforeWindowUpdate;
|
| + static const int kFractionReadBeforeWindowUpdate = 3;
|
|
|
| enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE };
|
|
|
| - virtual ~ByteStreamReader() = 0;
|
| + virtual ~ByteStreamReader() { };
|
|
|
| // Returns STREAM_EMPTY if there is no data on the ByteStream and
|
| // Close() has not been called, and STREAM_COMPLETE if there
|
| @@ -172,7 +178,7 @@ class CONTENT_EXPORT ByteStreamReader {
|
| size_t* length) = 0;
|
|
|
| // Only valid to call if Read() has returned STREAM_COMPLETE.
|
| - virtual DownloadInterruptReason GetStatus() const = 0;
|
| + virtual StatusType GetStatus() const = 0;
|
|
|
| // Register a callback to be called when data is added or the source
|
| // completes. The callback will be always be called on the owning
|
| @@ -184,12 +190,391 @@ class CONTENT_EXPORT ByteStreamReader {
|
| virtual void RegisterCallback(const base::Closure& sink_callback) = 0;
|
| };
|
|
|
| -CONTENT_EXPORT void CreateByteStream(
|
| +typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
|
| +ByteStreamContentVector;
|
| +
|
| +// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
|
| +// cleared in an object destructor and accessed to check for object
|
| +// existence. We can't use weak pointers because they're tightly tied to
|
| +// threads rather than task runners.
|
| +// TODO(rdsmith): A better solution would be extending weak pointers
|
| +// to support SequencedTaskRunners.
|
| +struct ByteStreamLifetimeFlag
|
| + : public base::RefCountedThreadSafe<ByteStreamLifetimeFlag> {
|
| + public:
|
| + ByteStreamLifetimeFlag() : is_alive(true) { }
|
| + bool is_alive;
|
| +
|
| + protected:
|
| + friend class base::RefCountedThreadSafe<ByteStreamLifetimeFlag>;
|
| + virtual ~ByteStreamLifetimeFlag() { }
|
| +
|
| + private:
|
| + DISALLOW_COPY_AND_ASSIGN(ByteStreamLifetimeFlag);
|
| +};
|
| +
|
| +template <typename StatusType>
|
| +class ByteStreamReaderImpl;
|
| +
|
| +// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
|
| +// SetPeer may happen anywhere; all other operations on each class must
|
| +// happen in the context of their SequencedTaskRunner.
|
| +template <typename StatusType>
|
| +class ByteStreamWriterImpl : public ByteStreamWriter<StatusType> {
|
| + public:
|
| + ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
|
| + scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag,
|
| + size_t buffer_size)
|
| + : total_buffer_size_(buffer_size),
|
| + my_task_runner_(task_runner),
|
| + my_lifetime_flag_(lifetime_flag),
|
| + input_contents_size_(0),
|
| + output_size_used_(0),
|
| + peer_(NULL) {
|
| + DCHECK(my_lifetime_flag_.get());
|
| + my_lifetime_flag_->is_alive = true;
|
| + }
|
| +
|
| + virtual ~ByteStreamWriterImpl() {
|
| + my_lifetime_flag_->is_alive = false;
|
| + }
|
| +
|
| + // Must be called before any operations are performed.
|
| + void SetPeer(ByteStreamReaderImpl<StatusType>* peer,
|
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
|
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) {
|
| + peer_ = peer;
|
| + peer_task_runner_ = peer_task_runner;
|
| + peer_lifetime_flag_ = peer_lifetime_flag;
|
| + }
|
| +
|
| + // Overridden from ByteStreamWriter.
|
| + virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
|
| + size_t byte_count) OVERRIDE;
|
| + virtual void Close(StatusType status) OVERRIDE;
|
| + virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| + space_available_callback_ = source_callback;
|
| + }
|
| +
|
| + // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
|
| + static void UpdateWindow(scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag,
|
| + ByteStreamWriterImpl<StatusType>* target,
|
| + size_t bytes_consumed) {
|
| + // If the target object isn't alive anymore, we do nothing.
|
| + if (!lifetime_flag->is_alive) return;
|
| +
|
| + target->UpdateWindowInternal(bytes_consumed);
|
| + }
|
| +
|
| + private:
|
| + // Called from UpdateWindow when object existence has been validated.
|
| + void UpdateWindowInternal(size_t bytes_consumed) {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| + DCHECK_GE(output_size_used_, bytes_consumed);
|
| + output_size_used_ -= bytes_consumed;
|
| +
|
| + // Callback if we were above the limit and we're now <= to it.
|
| + size_t total_known_size_used =
|
| + input_contents_size_ + output_size_used_;
|
| +
|
| + if (total_known_size_used <= total_buffer_size_ &&
|
| + (total_known_size_used + bytes_consumed > total_buffer_size_) &&
|
| + !space_available_callback_.is_null())
|
| + space_available_callback_.Run();
|
| + }
|
| +
|
| + void DrainInputBuffer(scoped_ptr<ByteStreamContentVector>* buffer) {
|
| + if (0 == input_contents_size_)
|
| + return;
|
| +
|
| + buffer->reset(new ByteStreamContentVector);
|
| + (*buffer)->swap(input_contents_);
|
| + output_size_used_ += input_contents_size_;
|
| + input_contents_size_ = 0;
|
| + }
|
| +
|
| + const size_t total_buffer_size_;
|
| +
|
| + // All data objects in this class are only valid to access on
|
| + // this task runner except as otherwise noted.
|
| + scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
|
| +
|
| + // True while this object is alive.
|
| + scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_;
|
| +
|
| + base::Closure space_available_callback_;
|
| + ByteStreamContentVector input_contents_;
|
| + size_t input_contents_size_;
|
| +
|
| + // ** Peer information.
|
| +
|
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
|
| +
|
| + // How much we've sent to the output that for flow control purposes we
|
| + // must assume hasn't been read yet.
|
| + size_t output_size_used_;
|
| +
|
| + // Only valid to access on peer_task_runner_.
|
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_;
|
| +
|
| + // Only valid to access on peer_task_runner_ if
|
| + // |*peer_lifetime_flag_ == true|
|
| + ByteStreamReaderImpl<StatusType>* peer_;
|
| +};
|
| +
|
| +template <typename StatusType>
|
| +class ByteStreamReaderImpl : public ByteStreamReader<StatusType> {
|
| + public:
|
| + ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
|
| + scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag,
|
| + size_t buffer_size)
|
| + : total_buffer_size_(buffer_size),
|
| + my_task_runner_(task_runner),
|
| + my_lifetime_flag_(lifetime_flag),
|
| + received_status_(false),
|
| + unreported_consumed_bytes_(0),
|
| + peer_(NULL) {
|
| + DCHECK(my_lifetime_flag_.get());
|
| + my_lifetime_flag_->is_alive = true;
|
| + }
|
| +
|
| + virtual ~ByteStreamReaderImpl() {
|
| + my_lifetime_flag_->is_alive = false;
|
| + }
|
| +
|
| + // Must be called before any operations are performed.
|
| + void SetPeer(ByteStreamWriterImpl<StatusType>* peer,
|
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
|
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) {
|
| + peer_ = peer;
|
| + peer_task_runner_ = peer_task_runner;
|
| + peer_lifetime_flag_ = peer_lifetime_flag;
|
| + }
|
| +
|
| + // Overridden from ByteStreamReader.
|
| + virtual typename ByteStreamReader<StatusType>::StreamState Read(
|
| + scoped_refptr<net::IOBuffer>* data,
|
| + size_t* length) OVERRIDE {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + if (available_contents_.size()) {
|
| + *data = available_contents_.front().first;
|
| + *length = available_contents_.front().second;
|
| + available_contents_.pop_front();
|
| + unreported_consumed_bytes_ += *length;
|
| +
|
| + MaybeUpdateInput();
|
| + return ByteStreamReader<StatusType>::STREAM_HAS_DATA;
|
| + }
|
| + if (received_status_) {
|
| + return ByteStreamReader<StatusType>::STREAM_COMPLETE;
|
| + }
|
| + return ByteStreamReader<StatusType>::STREAM_EMPTY;
|
| + }
|
| + virtual StatusType GetStatus() const OVERRIDE {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| + DCHECK(received_status_);
|
| + return status_;
|
| + }
|
| + virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + data_available_callback_ = sink_callback;
|
| + }
|
| +
|
| + // PostTask targets from |ByteStreamWriterImpl::Write| and
|
| + // |ByteStreamWriterImpl::Close|.
|
| + // Receive data from our peer.
|
| + // static because it may be called after the object it is targeting
|
| + // has been destroyed. It may not access |*target|
|
| + // if |*object_lifetime_flag| is false.
|
| + static void TransferData(
|
| + scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag,
|
| + ByteStreamReaderImpl<StatusType>* target,
|
| + scoped_ptr<ByteStreamContentVector> transfer_buffer) {
|
| + // If our target is no longer alive, do nothing.
|
| + if (!object_lifetime_flag->is_alive) return;
|
| +
|
| + target->TransferDataInternal(transfer_buffer.Pass());
|
| + }
|
| + static void TransferDataAndClose(
|
| + scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag,
|
| + ByteStreamReaderImpl<StatusType>* target,
|
| + scoped_ptr<ByteStreamContentVector> transfer_buffer,
|
| + StatusType status) {
|
| + // If our target is no longer alive, do nothing.
|
| + if (!object_lifetime_flag->is_alive) return;
|
| +
|
| + target->TransferDataAndCloseInternal(transfer_buffer.Pass(), status);
|
| + }
|
| +
|
| + private:
|
| + void AppendBuffer(scoped_ptr<ByteStreamContentVector> buffer) {
|
| + if (!buffer)
|
| + return;
|
| +
|
| + available_contents_.insert(available_contents_.end(),
|
| + buffer->begin(),
|
| + buffer->end());
|
| + }
|
| +
|
| + void MaybeInvokeCallback(bool was_empty, bool source_complete) {
|
| + // Callback on transition from empty to non-empty, or
|
| + // source complete.
|
| + if (((was_empty && !available_contents_.empty()) ||
|
| + source_complete) &&
|
| + !data_available_callback_.is_null())
|
| + data_available_callback_.Run();
|
| + }
|
| +
|
| + // Called from TransferData.* once object existence has been validated.
|
| + void TransferDataInternal(
|
| + scoped_ptr<ByteStreamContentVector> transfer_buffer) {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + bool was_empty = available_contents_.empty();
|
| +
|
| + AppendBuffer(transfer_buffer.Pass());
|
| +
|
| + MaybeInvokeCallback(was_empty, false /* source_complete */);
|
| + }
|
| + void TransferDataAndCloseInternal(
|
| + scoped_ptr<ByteStreamContentVector> transfer_buffer, StatusType status) {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + bool was_empty = available_contents_.empty();
|
| +
|
| + AppendBuffer(transfer_buffer.Pass());
|
| +
|
| + received_status_ = true;
|
| + status_ = status;
|
| +
|
| + MaybeInvokeCallback(was_empty, true /* source_complete */);
|
| + }
|
| +
|
| + void MaybeUpdateInput();
|
| +
|
| + const size_t total_buffer_size_;
|
| +
|
| + scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
|
| +
|
| + // True while this object is alive.
|
| + scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_;
|
| +
|
| + ByteStreamContentVector available_contents_;
|
| +
|
| + bool received_status_;
|
| + StatusType status_;
|
| +
|
| + base::Closure data_available_callback_;
|
| +
|
| + // Time of last point at which data in stream transitioned from full
|
| + // to non-full. Nulled when a callback is sent.
|
| + base::Time last_non_full_time_;
|
| +
|
| + // ** Peer information
|
| +
|
| + scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
|
| +
|
| + // How much has been removed from this class that we haven't told
|
| + // the input about yet.
|
| + size_t unreported_consumed_bytes_;
|
| +
|
| + // Only valid to access on peer_task_runner_.
|
| + scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_;
|
| +
|
| + // Only valid to access on peer_task_runner_ if
|
| + // |*peer_lifetime_flag_ == true|
|
| + ByteStreamWriterImpl<StatusType>* peer_;
|
| +};
|
| +
|
| +template <typename StatusType>
|
| +bool ByteStreamWriterImpl<StatusType>::Write(
|
| + scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + input_contents_.push_back(std::make_pair(buffer, byte_count));
|
| + input_contents_size_ += byte_count;
|
| +
|
| + // Arbitrarily, we buffer to a third of the total size before sending.
|
| + if (input_contents_size_ > total_buffer_size_ /
|
| + ByteStreamWriter<StatusType>::kFractionBufferBeforeSending) {
|
| + scoped_ptr<ByteStreamContentVector> transfer_buffer(
|
| + new ByteStreamContentVector);
|
| + DrainInputBuffer(&transfer_buffer);
|
| +
|
| + peer_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(
|
| + &ByteStreamReaderImpl<StatusType>::TransferData,
|
| + peer_lifetime_flag_,
|
| + peer_,
|
| + base::Passed(&transfer_buffer)));
|
| + }
|
| +
|
| + return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
|
| +}
|
| +
|
| +template <typename StatusType>
|
| +void ByteStreamWriterImpl<StatusType>::Close(StatusType status) {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + scoped_ptr<ByteStreamContentVector> transfer_buffer(
|
| + new ByteStreamContentVector);
|
| + DrainInputBuffer(&transfer_buffer);
|
| +
|
| + peer_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(
|
| + &ByteStreamReaderImpl<StatusType>::TransferDataAndClose,
|
| + peer_lifetime_flag_,
|
| + peer_,
|
| + base::Passed(&transfer_buffer),
|
| + status));
|
| +}
|
| +
|
| +// Decide whether or not to send the input a window update.
|
| +// Currently we do that whenever we've got unreported consumption
|
| +// greater than 1/3 of total size.
|
| +template <typename StatusType>
|
| +void ByteStreamReaderImpl<StatusType>::MaybeUpdateInput() {
|
| + DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + if (unreported_consumed_bytes_ <=
|
| + total_buffer_size_ /
|
| + ByteStreamReader<StatusType>::kFractionReadBeforeWindowUpdate)
|
| + return;
|
| +
|
| + peer_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(
|
| + &ByteStreamWriterImpl<StatusType>::UpdateWindow,
|
| + peer_lifetime_flag_,
|
| + peer_,
|
| + unreported_consumed_bytes_));
|
| + unreported_consumed_bytes_ = 0;
|
| +}
|
| +
|
| +template <typename StatusType>
|
| +void CreateByteStream(
|
| scoped_refptr<base::SequencedTaskRunner> input_task_runner,
|
| scoped_refptr<base::SequencedTaskRunner> output_task_runner,
|
| size_t buffer_size,
|
| - scoped_ptr<ByteStreamWriter>* input,
|
| - scoped_ptr<ByteStreamReader>* output);
|
| + scoped_ptr<ByteStreamWriter<StatusType> >* input,
|
| + scoped_ptr<ByteStreamReader<StatusType> >* output) {
|
| + scoped_refptr<ByteStreamLifetimeFlag> input_flag(
|
| + new ByteStreamLifetimeFlag());
|
| + scoped_refptr<ByteStreamLifetimeFlag> output_flag(
|
| + new ByteStreamLifetimeFlag());
|
| +
|
| + ByteStreamWriterImpl<StatusType>* in = new ByteStreamWriterImpl<StatusType>(
|
| + input_task_runner, input_flag, buffer_size);
|
| + ByteStreamReaderImpl<StatusType>* out = new ByteStreamReaderImpl<StatusType>(
|
| + output_task_runner, output_flag, buffer_size);
|
| +
|
| + in->SetPeer(out, output_task_runner, output_flag);
|
| + out->SetPeer(in, input_task_runner, input_flag);
|
| + input->reset(in);
|
| + output->reset(out);
|
| +}
|
|
|
| } // namespace content
|
|
|
|
|