Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(276)

Unified Diff: content/browser/byte_stream.h

Issue 18284005: Make ByteStream independent from DownloadInterruptReason (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rdsmith's comments Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | content/browser/byte_stream.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/browser/byte_stream.h
diff --git a/content/browser/byte_stream.h b/content/browser/byte_stream.h
index 49b76749051d180861a3de5deaf5417679ce1bcd..4c7f0b2759368050c137c3387cf75c7b6a015371 100644
--- a/content/browser/byte_stream.h
+++ b/content/browser/byte_stream.h
@@ -9,10 +9,11 @@
#include <set>
#include <utility>
+#include "base/bind.h"
#include "base/callback.h"
+#include "base/location.h"
#include "base/memory/ref_counted.h"
-#include "base/synchronization/lock.h"
-#include "content/public/browser/download_interrupt_reasons.h"
+#include "base/sequenced_task_runner.h"
#include "net/base/io_buffer.h"
namespace base {
@@ -33,8 +34,12 @@ namespace content {
// and the sink retrieves bytes already written via |ByteStreamReader::Read|.
//
// When the source has no more data to add, it will call
-// |ByteStreamWriter::Close| to indicate that. Errors at the source
-// are indicated to the sink via a non-DOWNLOAD_INTERRUPT_REASON_NONE code.
+// |ByteStreamWriter::Close| to indicate that. Operation status at the source
+// are indicated to the sink via the GetStatus() method.
+//
+// The type of operation status is specified as the StatusType template
+// parameter of these classes. StatusType must have a 0-arg constructor and be
+// copyable and assignable.
//
// Normally the source is not managed after the relationship is setup;
// it is expected to provide data and then close itself. If an error
@@ -62,9 +67,9 @@ namespace content {
//
// void OriginatingClass::Initialize() {
// // Create a stream for sending bytes from IO->FILE threads.
-// scoped_ptr<ByteStreamWriter> writer;
-// scoped_ptr<ByteStreamReader> reader;
-// CreateByteStream(
+// scoped_ptr<ByteStreamWriter<StatusType> > writer;
+// scoped_ptr<ByteStreamReader<StatusType> > reader;
+// CreateByteStream<StatusType>(
// BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO),
// BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE),
// kStreamBufferSize /* e.g. 10240. */,
@@ -113,20 +118,21 @@ namespace content {
// }
//
// if (ByteStreamReader::STREAM_COMPLETE == state) {
-// DownloadInterruptReason status = reader->GetStatus();
+// <StatusType> status = reader->GetStatus();
// // Process error or successful completion in |status|.
// }
//
// // if |state| is STREAM_EMPTY, we're done for now; we'll be called
// // again when there's more data.
// }
-class CONTENT_EXPORT ByteStreamWriter {
-public:
+template <typename StatusType>
+class ByteStreamWriter {
+ public:
// Inverse of the fraction of the stream buffer that must be full before
// a notification is sent to paired Reader that there's more data.
- static const int kFractionBufferBeforeSending;
+ static const int kFractionBufferBeforeSending = 3;
- virtual ~ByteStreamWriter() = 0;
+ virtual ~ByteStreamWriter() { };
// Always adds the data passed into the ByteStream. Returns true
// if more data may be added without exceeding the class limit
@@ -135,9 +141,8 @@ public:
size_t byte_count) = 0;
// Signal that all data that is going to be sent, has been sent,
- // and provide a status. |DOWNLOAD_INTERRUPT_REASON_NONE| should be
- // passed for successful completion.
- virtual void Close(DownloadInterruptReason status) = 0;
+ // and provide a status.
+ virtual void Close(StatusType status) = 0;
// Register a callback to be called when the stream transitions from
// full to having space available. The callback will always be
@@ -152,15 +157,16 @@ public:
virtual void RegisterCallback(const base::Closure& source_callback) = 0;
};
-class CONTENT_EXPORT ByteStreamReader {
+template <typename StatusType>
+class ByteStreamReader {
public:
// Inverse of the fraction of the stream buffer that must be empty before
// a notification is send to paired Writer that there's more room.
- static const int kFractionReadBeforeWindowUpdate;
+ static const int kFractionReadBeforeWindowUpdate = 3;
enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE };
- virtual ~ByteStreamReader() = 0;
+ virtual ~ByteStreamReader() { };
// Returns STREAM_EMPTY if there is no data on the ByteStream and
// Close() has not been called, and STREAM_COMPLETE if there
@@ -172,7 +178,7 @@ class CONTENT_EXPORT ByteStreamReader {
size_t* length) = 0;
// Only valid to call if Read() has returned STREAM_COMPLETE.
- virtual DownloadInterruptReason GetStatus() const = 0;
+ virtual StatusType GetStatus() const = 0;
// Register a callback to be called when data is added or the source
// completes. The callback will be always be called on the owning
@@ -184,12 +190,391 @@ class CONTENT_EXPORT ByteStreamReader {
virtual void RegisterCallback(const base::Closure& sink_callback) = 0;
};
-CONTENT_EXPORT void CreateByteStream(
+typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
+ByteStreamContentVector;
+
+// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
+// cleared in an object destructor and accessed to check for object
+// existence. We can't use weak pointers because they're tightly tied to
+// threads rather than task runners.
+// TODO(rdsmith): A better solution would be extending weak pointers
+// to support SequencedTaskRunners.
+struct ByteStreamLifetimeFlag
+ : public base::RefCountedThreadSafe<ByteStreamLifetimeFlag> {
+ public:
+ ByteStreamLifetimeFlag() : is_alive(true) { }
+ bool is_alive;
+
+ protected:
+ friend class base::RefCountedThreadSafe<ByteStreamLifetimeFlag>;
+ virtual ~ByteStreamLifetimeFlag() { }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ByteStreamLifetimeFlag);
+};
+
+template <typename StatusType>
+class ByteStreamReaderImpl;
+
+// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
+// SetPeer may happen anywhere; all other operations on each class must
+// happen in the context of their SequencedTaskRunner.
+template <typename StatusType>
+class ByteStreamWriterImpl : public ByteStreamWriter<StatusType> {
+ public:
+ ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
+ scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag,
+ size_t buffer_size)
+ : total_buffer_size_(buffer_size),
+ my_task_runner_(task_runner),
+ my_lifetime_flag_(lifetime_flag),
+ input_contents_size_(0),
+ output_size_used_(0),
+ peer_(NULL) {
+ DCHECK(my_lifetime_flag_.get());
+ my_lifetime_flag_->is_alive = true;
+ }
+
+ virtual ~ByteStreamWriterImpl() {
+ my_lifetime_flag_->is_alive = false;
+ }
+
+ // Must be called before any operations are performed.
+ void SetPeer(ByteStreamReaderImpl<StatusType>* peer,
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) {
+ peer_ = peer;
+ peer_task_runner_ = peer_task_runner;
+ peer_lifetime_flag_ = peer_lifetime_flag;
+ }
+
+ // Overridden from ByteStreamWriter.
+ virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
+ size_t byte_count) OVERRIDE;
+ virtual void Close(StatusType status) OVERRIDE;
+ virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+ space_available_callback_ = source_callback;
+ }
+
+ // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
+ static void UpdateWindow(scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag,
+ ByteStreamWriterImpl<StatusType>* target,
+ size_t bytes_consumed) {
+ // If the target object isn't alive anymore, we do nothing.
+ if (!lifetime_flag->is_alive) return;
+
+ target->UpdateWindowInternal(bytes_consumed);
+ }
+
+ private:
+ // Called from UpdateWindow when object existence has been validated.
+ void UpdateWindowInternal(size_t bytes_consumed) {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+ DCHECK_GE(output_size_used_, bytes_consumed);
+ output_size_used_ -= bytes_consumed;
+
+ // Callback if we were above the limit and we're now <= to it.
+ size_t total_known_size_used =
+ input_contents_size_ + output_size_used_;
+
+ if (total_known_size_used <= total_buffer_size_ &&
+ (total_known_size_used + bytes_consumed > total_buffer_size_) &&
+ !space_available_callback_.is_null())
+ space_available_callback_.Run();
+ }
+
+ void DrainInputBuffer(scoped_ptr<ByteStreamContentVector>* buffer) {
+ if (0 == input_contents_size_)
+ return;
+
+ buffer->reset(new ByteStreamContentVector);
+ (*buffer)->swap(input_contents_);
+ output_size_used_ += input_contents_size_;
+ input_contents_size_ = 0;
+ }
+
+ const size_t total_buffer_size_;
+
+ // All data objects in this class are only valid to access on
+ // this task runner except as otherwise noted.
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
+
+ // True while this object is alive.
+ scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_;
+
+ base::Closure space_available_callback_;
+ ByteStreamContentVector input_contents_;
+ size_t input_contents_size_;
+
+ // ** Peer information.
+
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
+
+ // How much we've sent to the output that for flow control purposes we
+ // must assume hasn't been read yet.
+ size_t output_size_used_;
+
+ // Only valid to access on peer_task_runner_.
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_;
+
+ // Only valid to access on peer_task_runner_ if
+ // |*peer_lifetime_flag_ == true|
+ ByteStreamReaderImpl<StatusType>* peer_;
+};
+
+template <typename StatusType>
+class ByteStreamReaderImpl : public ByteStreamReader<StatusType> {
+ public:
+ ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
+ scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag,
+ size_t buffer_size)
+ : total_buffer_size_(buffer_size),
+ my_task_runner_(task_runner),
+ my_lifetime_flag_(lifetime_flag),
+ received_status_(false),
+ unreported_consumed_bytes_(0),
+ peer_(NULL) {
+ DCHECK(my_lifetime_flag_.get());
+ my_lifetime_flag_->is_alive = true;
+ }
+
+ virtual ~ByteStreamReaderImpl() {
+ my_lifetime_flag_->is_alive = false;
+ }
+
+ // Must be called before any operations are performed.
+ void SetPeer(ByteStreamWriterImpl<StatusType>* peer,
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) {
+ peer_ = peer;
+ peer_task_runner_ = peer_task_runner;
+ peer_lifetime_flag_ = peer_lifetime_flag;
+ }
+
+ // Overridden from ByteStreamReader.
+ virtual typename ByteStreamReader<StatusType>::StreamState Read(
+ scoped_refptr<net::IOBuffer>* data,
+ size_t* length) OVERRIDE {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+
+ if (available_contents_.size()) {
+ *data = available_contents_.front().first;
+ *length = available_contents_.front().second;
+ available_contents_.pop_front();
+ unreported_consumed_bytes_ += *length;
+
+ MaybeUpdateInput();
+ return ByteStreamReader<StatusType>::STREAM_HAS_DATA;
+ }
+ if (received_status_) {
+ return ByteStreamReader<StatusType>::STREAM_COMPLETE;
+ }
+ return ByteStreamReader<StatusType>::STREAM_EMPTY;
+ }
+ virtual StatusType GetStatus() const OVERRIDE {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+ DCHECK(received_status_);
+ return status_;
+ }
+ virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+
+ data_available_callback_ = sink_callback;
+ }
+
+ // PostTask targets from |ByteStreamWriterImpl::Write| and
+ // |ByteStreamWriterImpl::Close|.
+ // Receive data from our peer.
+ // static because it may be called after the object it is targeting
+ // has been destroyed. It may not access |*target|
+ // if |*object_lifetime_flag| is false.
+ static void TransferData(
+ scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag,
+ ByteStreamReaderImpl<StatusType>* target,
+ scoped_ptr<ByteStreamContentVector> transfer_buffer) {
+ // If our target is no longer alive, do nothing.
+ if (!object_lifetime_flag->is_alive) return;
+
+ target->TransferDataInternal(transfer_buffer.Pass());
+ }
+ static void TransferDataAndClose(
+ scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag,
+ ByteStreamReaderImpl<StatusType>* target,
+ scoped_ptr<ByteStreamContentVector> transfer_buffer,
+ StatusType status) {
+ // If our target is no longer alive, do nothing.
+ if (!object_lifetime_flag->is_alive) return;
+
+ target->TransferDataAndCloseInternal(transfer_buffer.Pass(), status);
+ }
+
+ private:
+ void AppendBuffer(scoped_ptr<ByteStreamContentVector> buffer) {
+ if (!buffer)
+ return;
+
+ available_contents_.insert(available_contents_.end(),
+ buffer->begin(),
+ buffer->end());
+ }
+
+ void MaybeInvokeCallback(bool was_empty, bool source_complete) {
+ // Callback on transition from empty to non-empty, or
+ // source complete.
+ if (((was_empty && !available_contents_.empty()) ||
+ source_complete) &&
+ !data_available_callback_.is_null())
+ data_available_callback_.Run();
+ }
+
+ // Called from TransferData.* once object existence has been validated.
+ void TransferDataInternal(
+ scoped_ptr<ByteStreamContentVector> transfer_buffer) {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+
+ bool was_empty = available_contents_.empty();
+
+ AppendBuffer(transfer_buffer.Pass());
+
+ MaybeInvokeCallback(was_empty, false /* source_complete */);
+ }
+ void TransferDataAndCloseInternal(
+ scoped_ptr<ByteStreamContentVector> transfer_buffer, StatusType status) {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+
+ bool was_empty = available_contents_.empty();
+
+ AppendBuffer(transfer_buffer.Pass());
+
+ received_status_ = true;
+ status_ = status;
+
+ MaybeInvokeCallback(was_empty, true /* source_complete */);
+ }
+
+ void MaybeUpdateInput();
+
+ const size_t total_buffer_size_;
+
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
+
+ // True while this object is alive.
+ scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_;
+
+ ByteStreamContentVector available_contents_;
+
+ bool received_status_;
+ StatusType status_;
+
+ base::Closure data_available_callback_;
+
+ // Time of last point at which data in stream transitioned from full
+ // to non-full. Nulled when a callback is sent.
+ base::Time last_non_full_time_;
+
+ // ** Peer information
+
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
+
+ // How much has been removed from this class that we haven't told
+ // the input about yet.
+ size_t unreported_consumed_bytes_;
+
+ // Only valid to access on peer_task_runner_.
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_;
+
+ // Only valid to access on peer_task_runner_ if
+ // |*peer_lifetime_flag_ == true|
+ ByteStreamWriterImpl<StatusType>* peer_;
+};
+
+template <typename StatusType>
+bool ByteStreamWriterImpl<StatusType>::Write(
+ scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+
+ input_contents_.push_back(std::make_pair(buffer, byte_count));
+ input_contents_size_ += byte_count;
+
+ // Arbitrarily, we buffer to a third of the total size before sending.
+ if (input_contents_size_ > total_buffer_size_ /
+ ByteStreamWriter<StatusType>::kFractionBufferBeforeSending) {
+ scoped_ptr<ByteStreamContentVector> transfer_buffer(
+ new ByteStreamContentVector);
+ DrainInputBuffer(&transfer_buffer);
+
+ peer_task_runner_->PostTask(
+ FROM_HERE, base::Bind(
+ &ByteStreamReaderImpl<StatusType>::TransferData,
+ peer_lifetime_flag_,
+ peer_,
+ base::Passed(&transfer_buffer)));
+ }
+
+ return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
+}
+
+template <typename StatusType>
+void ByteStreamWriterImpl<StatusType>::Close(StatusType status) {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+
+ scoped_ptr<ByteStreamContentVector> transfer_buffer(
+ new ByteStreamContentVector);
+ DrainInputBuffer(&transfer_buffer);
+
+ peer_task_runner_->PostTask(
+ FROM_HERE, base::Bind(
+ &ByteStreamReaderImpl<StatusType>::TransferDataAndClose,
+ peer_lifetime_flag_,
+ peer_,
+ base::Passed(&transfer_buffer),
+ status));
+}
+
+// Decide whether or not to send the input a window update.
+// Currently we do that whenever we've got unreported consumption
+// greater than 1/3 of total size.
+template <typename StatusType>
+void ByteStreamReaderImpl<StatusType>::MaybeUpdateInput() {
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
+
+ if (unreported_consumed_bytes_ <=
+ total_buffer_size_ /
+ ByteStreamReader<StatusType>::kFractionReadBeforeWindowUpdate)
+ return;
+
+ peer_task_runner_->PostTask(
+ FROM_HERE, base::Bind(
+ &ByteStreamWriterImpl<StatusType>::UpdateWindow,
+ peer_lifetime_flag_,
+ peer_,
+ unreported_consumed_bytes_));
+ unreported_consumed_bytes_ = 0;
+}
+
+template <typename StatusType>
+void CreateByteStream(
scoped_refptr<base::SequencedTaskRunner> input_task_runner,
scoped_refptr<base::SequencedTaskRunner> output_task_runner,
size_t buffer_size,
- scoped_ptr<ByteStreamWriter>* input,
- scoped_ptr<ByteStreamReader>* output);
+ scoped_ptr<ByteStreamWriter<StatusType> >* input,
+ scoped_ptr<ByteStreamReader<StatusType> >* output) {
+ scoped_refptr<ByteStreamLifetimeFlag> input_flag(
+ new ByteStreamLifetimeFlag());
+ scoped_refptr<ByteStreamLifetimeFlag> output_flag(
+ new ByteStreamLifetimeFlag());
+
+ ByteStreamWriterImpl<StatusType>* in = new ByteStreamWriterImpl<StatusType>(
+ input_task_runner, input_flag, buffer_size);
+ ByteStreamReaderImpl<StatusType>* out = new ByteStreamReaderImpl<StatusType>(
+ output_task_runner, output_flag, buffer_size);
+
+ in->SetPeer(out, output_task_runner, output_flag);
+ out->SetPeer(in, input_task_runner, input_flag);
+ input->reset(in);
+ output->reset(out);
+}
} // namespace content
« no previous file with comments | « no previous file | content/browser/byte_stream.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698