Index: content/browser/byte_stream.h |
diff --git a/content/browser/byte_stream.h b/content/browser/byte_stream.h |
index 49b76749051d180861a3de5deaf5417679ce1bcd..d358b0474d91c8b0c83a204f9c3416d6455d00b4 100644 |
--- a/content/browser/byte_stream.h |
+++ b/content/browser/byte_stream.h |
@@ -9,10 +9,11 @@ |
#include <set> |
#include <utility> |
+#include "base/bind.h" |
#include "base/callback.h" |
+#include "base/location.h" |
#include "base/memory/ref_counted.h" |
-#include "base/synchronization/lock.h" |
-#include "content/public/browser/download_interrupt_reasons.h" |
+#include "base/sequenced_task_runner.h" |
#include "net/base/io_buffer.h" |
namespace base { |
@@ -34,7 +35,7 @@ namespace content { |
// |
// When the source has no more data to add, it will call |
// |ByteStreamWriter::Close| to indicate that. Errors at the source |
-// are indicated to the sink via a non-DOWNLOAD_INTERRUPT_REASON_NONE code. |
+// are indicated to the sink via the GetStatus() method. |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Suggestion: Change "Errors" to "Operation status";
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
|
// |
// Normally the source is not managed after the relationship is setup; |
// it is expected to provide data and then close itself. If an error |
@@ -62,9 +63,9 @@ namespace content { |
// |
// void OriginatingClass::Initialize() { |
// // Create a stream for sending bytes from IO->FILE threads. |
-// scoped_ptr<ByteStreamWriter> writer; |
-// scoped_ptr<ByteStreamReader> reader; |
-// CreateByteStream( |
+// scoped_ptr<ByteStreamWriter<status type> > writer; |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Could you change this to StatusType? My eye keeps
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
|
+// scoped_ptr<ByteStreamReader<status type> > reader; |
+// CreateByteStream<status type>( |
// BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), |
// BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), |
// kStreamBufferSize /* e.g. 10240. */, |
@@ -113,20 +114,21 @@ namespace content { |
// } |
// |
// if (ByteStreamReader::STREAM_COMPLETE == state) { |
-// DownloadInterruptReason status = reader->GetStatus(); |
+// <status type> status = reader->GetStatus(); |
// // Process error or successful completion in |status|. |
// } |
// |
// // if |state| is STREAM_EMPTY, we're done for now; we'll be called |
// // again when there's more data. |
// } |
-class CONTENT_EXPORT ByteStreamWriter { |
-public: |
+template <typename StatusType> |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
We need to document the requirements on StatusType
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Added around L38
|
+class ByteStreamWriter { |
+ public: |
// Inverse of the fraction of the stream buffer that must be full before |
// a notification is sent to paired Reader that there's more data. |
- static const int kFractionBufferBeforeSending; |
+ static const int kFractionBufferBeforeSending = 3; |
- virtual ~ByteStreamWriter() = 0; |
+ virtual ~ByteStreamWriter() { }; |
// Always adds the data passed into the ByteStream. Returns true |
// if more data may be added without exceeding the class limit |
@@ -135,9 +137,8 @@ public: |
size_t byte_count) = 0; |
// Signal that all data that is going to be sent, has been sent, |
- // and provide a status. |DOWNLOAD_INTERRUPT_REASON_NONE| should be |
- // passed for successful completion. |
- virtual void Close(DownloadInterruptReason status) = 0; |
+ // and provide a status. |
+ virtual void Close(StatusType status) = 0; |
// Register a callback to be called when the stream transitions from |
// full to having space available. The callback will always be |
@@ -152,15 +153,16 @@ public: |
virtual void RegisterCallback(const base::Closure& source_callback) = 0; |
}; |
-class CONTENT_EXPORT ByteStreamReader { |
+template <typename StatusType> |
+class ByteStreamReader { |
public: |
// Inverse of the fraction of the stream buffer that must be empty before |
// a notification is send to paired Writer that there's more room. |
- static const int kFractionReadBeforeWindowUpdate; |
+ static const int kFractionReadBeforeWindowUpdate = 3; |
enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; |
- virtual ~ByteStreamReader() = 0; |
+ virtual ~ByteStreamReader() { }; |
// Returns STREAM_EMPTY if there is no data on the ByteStream and |
// Close() has not been called, and STREAM_COMPLETE if there |
@@ -172,7 +174,7 @@ class CONTENT_EXPORT ByteStreamReader { |
size_t* length) = 0; |
// Only valid to call if Read() has returned STREAM_COMPLETE. |
- virtual DownloadInterruptReason GetStatus() const = 0; |
+ virtual StatusType GetStatus() const = 0; |
// Register a callback to be called when data is added or the source |
// completes. The callback will be always be called on the owning |
@@ -184,12 +186,391 @@ class CONTENT_EXPORT ByteStreamReader { |
virtual void RegisterCallback(const base::Closure& sink_callback) = 0; |
}; |
-CONTENT_EXPORT void CreateByteStream( |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
I'm surprised that removing CONTENT_EXPORT works;
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
As we have implementation in .h file (to instantia
|
+typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > |
+ByteStreamContentVector; |
+ |
+// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be |
+// cleared in an object destructor and accessed to check for object |
+// existence. We can't use weak pointers because they're tightly tied to |
+// threads rather than task runners. |
+// TODO(rdsmith): A better solution would be extending weak pointers |
+// to support SequencedTaskRunners. |
+struct ByteStreamLifetimeFlag |
+ : public base::RefCountedThreadSafe<ByteStreamLifetimeFlag> { |
+ public: |
+ ByteStreamLifetimeFlag() : is_alive(true) { } |
+ bool is_alive; |
+ |
+ protected: |
+ friend class base::RefCountedThreadSafe<ByteStreamLifetimeFlag>; |
+ virtual ~ByteStreamLifetimeFlag() { } |
+ |
+ private: |
+ DISALLOW_COPY_AND_ASSIGN(ByteStreamLifetimeFlag); |
+}; |
+ |
+template <typename StatusType> |
+class ByteStreamReaderImpl; |
+ |
+// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and |
+// SetPeer may happen anywhere; all other operations on each class must |
+// happen in the context of their SequencedTaskRunner. |
+template <typename StatusType> |
+class ByteStreamWriterImpl : public ByteStreamWriter<StatusType> { |
+ public: |
+ ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
+ scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
+ size_t buffer_size) |
+ : total_buffer_size_(buffer_size), |
+ my_task_runner_(task_runner), |
+ my_lifetime_flag_(lifetime_flag), |
+ input_contents_size_(0), |
+ output_size_used_(0), |
+ peer_(NULL) { |
+ DCHECK(my_lifetime_flag_.get()); |
+ my_lifetime_flag_->is_alive = true; |
+ } |
+ |
+ virtual ~ByteStreamWriterImpl() { |
+ my_lifetime_flag_->is_alive = false; |
+ } |
+ |
+ // Must be called before any operations are performed. |
+ void SetPeer(ByteStreamReaderImpl<StatusType>* peer, |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { |
+ peer_ = peer; |
+ peer_task_runner_ = peer_task_runner; |
+ peer_lifetime_flag_ = peer_lifetime_flag; |
+ } |
+ |
+ // Overridden from ByteStreamWriter. |
+ virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
+ size_t byte_count) OVERRIDE; |
+ virtual void Close(StatusType status) OVERRIDE; |
+ virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ space_available_callback_ = source_callback; |
+ } |
+ |
+ // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. |
+ static void UpdateWindow(scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
+ ByteStreamWriterImpl<StatusType>* target, |
+ size_t bytes_consumed) { |
+ // If the target object isn't alive anymore, we do nothing. |
+ if (!lifetime_flag->is_alive) return; |
+ |
+ target->UpdateWindowInternal(bytes_consumed); |
+ } |
+ |
+ private: |
+ // Called from UpdateWindow when object existence has been validated. |
+ void UpdateWindowInternal(size_t bytes_consumed) { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ DCHECK_GE(output_size_used_, bytes_consumed); |
+ output_size_used_ -= bytes_consumed; |
+ |
+ // Callback if we were above the limit and we're now <= to it. |
+ size_t total_known_size_used = |
+ input_contents_size_ + output_size_used_; |
+ |
+ if (total_known_size_used <= total_buffer_size_ && |
+ (total_known_size_used + bytes_consumed > total_buffer_size_) && |
+ !space_available_callback_.is_null()) |
+ space_available_callback_.Run(); |
+ } |
+ |
+ void DrainInputBuffer(scoped_ptr<ByteStreamContentVector>* buffer) { |
+ if (0 == input_contents_size_) |
+ return; |
+ |
+ buffer->reset(new ByteStreamContentVector); |
+ (*buffer)->swap(input_contents_); |
+ output_size_used_ += input_contents_size_; |
+ input_contents_size_ = 0; |
+ } |
+ |
+ const size_t total_buffer_size_; |
+ |
+ // All data objects in this class are only valid to access on |
+ // this task runner except as otherwise noted. |
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
+ |
+ // True while this object is alive. |
+ scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; |
+ |
+ base::Closure space_available_callback_; |
+ ByteStreamContentVector input_contents_; |
+ size_t input_contents_size_; |
+ |
+ // ** Peer information. |
+ |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
+ |
+ // How much we've sent to the output that for flow control purposes we |
+ // must assume hasn't been read yet. |
+ size_t output_size_used_; |
+ |
+ // Only valid to access on peer_task_runner_. |
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; |
+ |
+ // Only valid to access on peer_task_runner_ if |
+ // |*peer_lifetime_flag_ == true| |
+ ByteStreamReaderImpl<StatusType>* peer_; |
+}; |
+ |
+template <typename StatusType> |
+class ByteStreamReaderImpl : public ByteStreamReader<StatusType> { |
+ public: |
+ ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
+ scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
+ size_t buffer_size) |
+ : total_buffer_size_(buffer_size), |
+ my_task_runner_(task_runner), |
+ my_lifetime_flag_(lifetime_flag), |
+ received_status_(false), |
+ unreported_consumed_bytes_(0), |
+ peer_(NULL) { |
+ DCHECK(my_lifetime_flag_.get()); |
+ my_lifetime_flag_->is_alive = true; |
+ } |
+ |
+ virtual ~ByteStreamReaderImpl() { |
+ my_lifetime_flag_->is_alive = false; |
+ } |
+ |
+ // Must be called before any operations are performed. |
+ void SetPeer(ByteStreamWriterImpl<StatusType>* peer, |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { |
+ peer_ = peer; |
+ peer_task_runner_ = peer_task_runner; |
+ peer_lifetime_flag_ = peer_lifetime_flag; |
+ } |
+ |
+ // Overridden from ByteStreamReader. |
+ virtual typename ByteStreamReader<StatusType>::StreamState Read( |
+ scoped_refptr<net::IOBuffer>* data, |
+ size_t* length) OVERRIDE { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ if (available_contents_.size()) { |
+ *data = available_contents_.front().first; |
+ *length = available_contents_.front().second; |
+ available_contents_.pop_front(); |
+ unreported_consumed_bytes_ += *length; |
+ |
+ MaybeUpdateInput(); |
+ return ByteStreamReader<StatusType>::STREAM_HAS_DATA; |
+ } |
+ if (received_status_) { |
+ return ByteStreamReader<StatusType>::STREAM_COMPLETE; |
+ } |
+ return ByteStreamReader<StatusType>::STREAM_EMPTY; |
+ } |
+ virtual StatusType GetStatus() const OVERRIDE { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ DCHECK(received_status_); |
+ return status_; |
+ } |
+ virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ data_available_callback_ = sink_callback; |
+ } |
+ |
+ // PostTask targets from |ByteStreamWriterImpl::Write| and |
+ // |ByteStreamWriterImpl::Close|. |
+ // Receive data from our peer. |
+ // static because it may be called after the object it is targeting |
+ // has been destroyed. It may not access |*target| |
+ // if |*object_lifetime_flag| is false. |
+ static void TransferData( |
+ scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, |
+ ByteStreamReaderImpl<StatusType>* target, |
+ scoped_ptr<ByteStreamContentVector> transfer_buffer) { |
+ // If our target is no longer alive, do nothing. |
+ if (!object_lifetime_flag->is_alive) return; |
+ |
+ target->TransferDataInternal(transfer_buffer.Pass()); |
+ } |
+ static void TransferDataAndClose( |
+ scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, |
+ ByteStreamReaderImpl<StatusType>* target, |
+ scoped_ptr<ByteStreamContentVector> transfer_buffer, |
+ StatusType status) { |
+ // If our target is no longer alive, do nothing. |
+ if (!object_lifetime_flag->is_alive) return; |
+ |
+ target->TransferDataAndCloseInternal(transfer_buffer.Pass(), status); |
+ } |
+ |
+ private: |
+ void AppendBuffer(scoped_ptr<ByteStreamContentVector> buffer) { |
+ if (!buffer) |
+ return; |
+ |
+ available_contents_.insert(available_contents_.end(), |
+ buffer->begin(), |
+ buffer->end()); |
+ } |
+ |
+ void MaybeInvokeCallback(bool was_empty, bool source_complete) { |
+ // Callback on transition from empty to non-empty, or |
+ // source complete. |
+ if (((was_empty && !available_contents_.empty()) || |
+ source_complete) && |
+ !data_available_callback_.is_null()) |
+ data_available_callback_.Run(); |
+ } |
+ |
+ // Called from TransferData.* once object existence has been validated. |
+ void TransferDataInternal( |
+ scoped_ptr<ByteStreamContentVector> transfer_buffer) { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ bool was_empty = available_contents_.empty(); |
+ |
+ AppendBuffer(transfer_buffer.Pass()); |
+ |
+ MaybeInvokeCallback(was_empty, false /* source_complete */); |
+ } |
+ void TransferDataAndCloseInternal( |
+ scoped_ptr<ByteStreamContentVector> transfer_buffer, StatusType status) { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ bool was_empty = available_contents_.empty(); |
+ |
+ AppendBuffer(transfer_buffer.Pass()); |
+ |
+ received_status_ = true; |
+ status_ = status; |
+ |
+ MaybeInvokeCallback(was_empty, true /* source_complete */); |
+ } |
+ |
+ void MaybeUpdateInput(); |
+ |
+ const size_t total_buffer_size_; |
+ |
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
+ |
+ // True while this object is alive. |
+ scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; |
+ |
+ ByteStreamContentVector available_contents_; |
+ |
+ bool received_status_; |
+ StatusType status_; |
+ |
+ base::Closure data_available_callback_; |
+ |
+ // Time of last point at which data in stream transitioned from full |
+ // to non-full. Nulled when a callback is sent. |
+ base::Time last_non_full_time_; |
+ |
+ // ** Peer information |
+ |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
+ |
+ // How much has been removed from this class that we haven't told |
+ // the input about yet. |
+ size_t unreported_consumed_bytes_; |
+ |
+ // Only valid to access on peer_task_runner_. |
+ scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; |
+ |
+ // Only valid to access on peer_task_runner_ if |
+ // |*peer_lifetime_flag_ == true| |
+ ByteStreamWriterImpl<StatusType>* peer_; |
+}; |
+ |
+template <typename StatusType> |
+bool ByteStreamWriterImpl<StatusType>::Write( |
+ scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ input_contents_.push_back(std::make_pair(buffer, byte_count)); |
+ input_contents_size_ += byte_count; |
+ |
+ // Arbitrarily, we buffer to a third of the total size before sending. |
+ if (input_contents_size_ > total_buffer_size_ / |
+ ByteStreamWriter<StatusType>::kFractionBufferBeforeSending) { |
+ scoped_ptr<ByteStreamContentVector> transfer_buffer( |
+ new ByteStreamContentVector); |
+ DrainInputBuffer(&transfer_buffer); |
+ |
+ peer_task_runner_->PostTask( |
+ FROM_HERE, base::Bind( |
+ &ByteStreamReaderImpl<StatusType>::TransferData, |
+ peer_lifetime_flag_, |
+ peer_, |
+ base::Passed(&transfer_buffer))); |
+ } |
+ |
+ return (input_contents_size_ + output_size_used_ <= total_buffer_size_); |
+} |
+ |
+template <typename StatusType> |
+void ByteStreamWriterImpl<StatusType>::Close(StatusType status) { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ scoped_ptr<ByteStreamContentVector> transfer_buffer( |
+ new ByteStreamContentVector); |
+ DrainInputBuffer(&transfer_buffer); |
+ |
+ peer_task_runner_->PostTask( |
+ FROM_HERE, base::Bind( |
+ &ByteStreamReaderImpl<StatusType>::TransferDataAndClose, |
+ peer_lifetime_flag_, |
+ peer_, |
+ base::Passed(&transfer_buffer), |
+ status)); |
+} |
+ |
+// Decide whether or not to send the input a window update. |
+// Currently we do that whenever we've got unreported consumption |
+// greater than 1/3 of total size. |
+template <typename StatusType> |
+void ByteStreamReaderImpl<StatusType>::MaybeUpdateInput() { |
+ DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ if (unreported_consumed_bytes_ <= |
+ total_buffer_size_ / |
+ ByteStreamReader<StatusType>::kFractionReadBeforeWindowUpdate) |
+ return; |
+ |
+ peer_task_runner_->PostTask( |
+ FROM_HERE, base::Bind( |
+ &ByteStreamWriterImpl<StatusType>::UpdateWindow, |
+ peer_lifetime_flag_, |
+ peer_, |
+ unreported_consumed_bytes_)); |
+ unreported_consumed_bytes_ = 0; |
+} |
+ |
+template <typename StatusType> |
+void CreateByteStream( |
scoped_refptr<base::SequencedTaskRunner> input_task_runner, |
scoped_refptr<base::SequencedTaskRunner> output_task_runner, |
size_t buffer_size, |
- scoped_ptr<ByteStreamWriter>* input, |
- scoped_ptr<ByteStreamReader>* output); |
+ scoped_ptr<ByteStreamWriter<StatusType> >* input, |
+ scoped_ptr<ByteStreamReader<StatusType> >* output) { |
+ scoped_refptr<ByteStreamLifetimeFlag> input_flag( |
+ new ByteStreamLifetimeFlag()); |
+ scoped_refptr<ByteStreamLifetimeFlag> output_flag( |
+ new ByteStreamLifetimeFlag()); |
+ |
+ ByteStreamWriterImpl<StatusType>* in = new ByteStreamWriterImpl<StatusType>( |
+ input_task_runner, input_flag, buffer_size); |
+ ByteStreamReaderImpl<StatusType>* out = new ByteStreamReaderImpl<StatusType>( |
+ output_task_runner, output_flag, buffer_size); |
+ |
+ in->SetPeer(out, output_task_runner, output_flag); |
+ out->SetPeer(in, input_task_runner, input_flag); |
+ input->reset(in); |
+ output->reset(out); |
+} |
} // namespace content |