Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #ifndef CONTENT_BROWSER_BYTE_STREAM_H_ | 5 #ifndef CONTENT_BROWSER_BYTE_STREAM_H_ |
| 6 #define CONTENT_BROWSER_BYTE_STREAM_H_ | 6 #define CONTENT_BROWSER_BYTE_STREAM_H_ |
| 7 | 7 |
| 8 #include <deque> | 8 #include <deque> |
| 9 #include <set> | 9 #include <set> |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/bind.h" | |
| 12 #include "base/callback.h" | 13 #include "base/callback.h" |
| 14 #include "base/location.h" | |
| 13 #include "base/memory/ref_counted.h" | 15 #include "base/memory/ref_counted.h" |
| 14 #include "base/synchronization/lock.h" | 16 #include "base/sequenced_task_runner.h" |
| 15 #include "content/public/browser/download_interrupt_reasons.h" | |
| 16 #include "net/base/io_buffer.h" | 17 #include "net/base/io_buffer.h" |
| 17 | 18 |
| 18 namespace base { | 19 namespace base { |
| 19 class SequencedTaskRunner; | 20 class SequencedTaskRunner; |
| 20 } | 21 } |
| 21 | 22 |
| 22 namespace content { | 23 namespace content { |
| 23 | 24 |
| 24 // A byte stream is a pipe to transfer bytes between a source and a | 25 // A byte stream is a pipe to transfer bytes between a source and a |
| 25 // sink, which may be on different threads. It is intended to be the | 26 // sink, which may be on different threads. It is intended to be the |
| 26 // only connection between source and sink; they need have no | 27 // only connection between source and sink; they need have no |
| 27 // direct awareness of each other aside from the byte stream. The source and | 28 // direct awareness of each other aside from the byte stream. The source and |
| 28 // the sink have different interfaces to a byte stream, |ByteStreamWriter| | 29 // the sink have different interfaces to a byte stream, |ByteStreamWriter| |
| 29 // and |ByteStreamReader|. A pair of connected interfaces is generated by | 30 // and |ByteStreamReader|. A pair of connected interfaces is generated by |
| 30 // calling |CreateByteStream|. | 31 // calling |CreateByteStream|. |
| 31 // | 32 // |
| 32 // The source adds bytes to the bytestream via |ByteStreamWriter::Write| | 33 // The source adds bytes to the bytestream via |ByteStreamWriter::Write| |
| 33 // and the sink retrieves bytes already written via |ByteStreamReader::Read|. | 34 // and the sink retrieves bytes already written via |ByteStreamReader::Read|. |
| 34 // | 35 // |
| 35 // When the source has no more data to add, it will call | 36 // When the source has no more data to add, it will call |
| 36 // |ByteStreamWriter::Close| to indicate that. Errors at the source | 37 // |ByteStreamWriter::Close| to indicate that. Errors at the source |
| 37 // are indicated to the sink via a non-DOWNLOAD_INTERRUPT_REASON_NONE code. | 38 // are indicated to the sink via the GetStatus() method. |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Suggestion: Change "Errors" to "Operation status";
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
| |
| 38 // | 39 // |
| 39 // Normally the source is not managed after the relationship is setup; | 40 // Normally the source is not managed after the relationship is setup; |
| 40 // it is expected to provide data and then close itself. If an error | 41 // it is expected to provide data and then close itself. If an error |
| 41 // occurs on the sink, it is not signalled to the source via this | 42 // occurs on the sink, it is not signalled to the source via this |
| 42 // mechanism; instead, the source will write data until it exausts the | 43 // mechanism; instead, the source will write data until it exausts the |
| 43 // available space. If the source needs to be aware of errors occuring | 44 // available space. If the source needs to be aware of errors occuring |
| 44 // on the sink, this must be signalled in some other fashion (usually | 45 // on the sink, this must be signalled in some other fashion (usually |
| 45 // through whatever controller setup the relationship). | 46 // through whatever controller setup the relationship). |
| 46 // | 47 // |
| 47 // Callback lifetime management: No lifetime management is done in this | 48 // Callback lifetime management: No lifetime management is done in this |
| 48 // class to prevent registered callbacks from being called after any | 49 // class to prevent registered callbacks from being called after any |
| 49 // objects to which they may refer have been destroyed. It is the | 50 // objects to which they may refer have been destroyed. It is the |
| 50 // responsibility of the callers to avoid use-after-free references. | 51 // responsibility of the callers to avoid use-after-free references. |
| 51 // This may be done by any of several mechanisms, including weak | 52 // This may be done by any of several mechanisms, including weak |
| 52 // pointers, scoped_refptr references, or calling the registration | 53 // pointers, scoped_refptr references, or calling the registration |
| 53 // function with a null callback from a destructor. To enable the null | 54 // function with a null callback from a destructor. To enable the null |
| 54 // callback strategy, callbacks will not be stored between retrieval and | 55 // callback strategy, callbacks will not be stored between retrieval and |
| 55 // evaluation, so setting a null callback will guarantee that the | 56 // evaluation, so setting a null callback will guarantee that the |
| 56 // previous callback will not be executed after setting. | 57 // previous callback will not be executed after setting. |
| 57 // | 58 // |
| 58 // Class methods are virtual to allow mocking for tests; these classes | 59 // Class methods are virtual to allow mocking for tests; these classes |
| 59 // aren't intended to be base classes for other classes. | 60 // aren't intended to be base classes for other classes. |
| 60 // | 61 // |
| 61 // Sample usage (note that this does not show callback usage): | 62 // Sample usage (note that this does not show callback usage): |
| 62 // | 63 // |
| 63 // void OriginatingClass::Initialize() { | 64 // void OriginatingClass::Initialize() { |
| 64 // // Create a stream for sending bytes from IO->FILE threads. | 65 // // Create a stream for sending bytes from IO->FILE threads. |
| 65 // scoped_ptr<ByteStreamWriter> writer; | 66 // scoped_ptr<ByteStreamWriter<status type> > writer; |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Could you change this to StatusType? My eye keeps
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
| |
| 66 // scoped_ptr<ByteStreamReader> reader; | 67 // scoped_ptr<ByteStreamReader<status type> > reader; |
| 67 // CreateByteStream( | 68 // CreateByteStream<status type>( |
| 68 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), | 69 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), |
| 69 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), | 70 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), |
| 70 // kStreamBufferSize /* e.g. 10240. */, | 71 // kStreamBufferSize /* e.g. 10240. */, |
| 71 // &writer, | 72 // &writer, |
| 72 // &reader); // Presumed passed to FILE thread for reading. | 73 // &reader); // Presumed passed to FILE thread for reading. |
| 73 // | 74 // |
| 74 // // Setup callback for writing. | 75 // // Setup callback for writing. |
| 75 // writer->RegisterCallback(base::Bind(&SpaceAvailable, this)); | 76 // writer->RegisterCallback(base::Bind(&SpaceAvailable, this)); |
| 76 // | 77 // |
| 77 // // Do initial round of writing. | 78 // // Do initial round of writing. |
| (...skipping 28 matching lines...) Expand all Loading... | |
| 106 // void ReceivingClass::DataAvailable() { | 107 // void ReceivingClass::DataAvailable() { |
| 107 // scoped_refptr<net::IOBuffer> data; | 108 // scoped_refptr<net::IOBuffer> data; |
| 108 // size_t length = 0; | 109 // size_t length = 0; |
| 109 // | 110 // |
| 110 // while (ByteStreamReader::STREAM_HAS_DATA == | 111 // while (ByteStreamReader::STREAM_HAS_DATA == |
| 111 // (state = reader->Read(&data, &length))) { | 112 // (state = reader->Read(&data, &length))) { |
| 112 // // Process |data|. | 113 // // Process |data|. |
| 113 // } | 114 // } |
| 114 // | 115 // |
| 115 // if (ByteStreamReader::STREAM_COMPLETE == state) { | 116 // if (ByteStreamReader::STREAM_COMPLETE == state) { |
| 116 // DownloadInterruptReason status = reader->GetStatus(); | 117 // <status type> status = reader->GetStatus(); |
| 117 // // Process error or successful completion in |status|. | 118 // // Process error or successful completion in |status|. |
| 118 // } | 119 // } |
| 119 // | 120 // |
| 120 // // if |state| is STREAM_EMPTY, we're done for now; we'll be called | 121 // // if |state| is STREAM_EMPTY, we're done for now; we'll be called |
| 121 // // again when there's more data. | 122 // // again when there's more data. |
| 122 // } | 123 // } |
| 123 class CONTENT_EXPORT ByteStreamWriter { | 124 template <typename StatusType> |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
We need to document the requirements on StatusType
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Added around L38
| |
| 124 public: | 125 class ByteStreamWriter { |
| 126 public: | |
| 125 // Inverse of the fraction of the stream buffer that must be full before | 127 // Inverse of the fraction of the stream buffer that must be full before |
| 126 // a notification is sent to paired Reader that there's more data. | 128 // a notification is sent to paired Reader that there's more data. |
| 127 static const int kFractionBufferBeforeSending; | 129 static const int kFractionBufferBeforeSending = 3; |
| 128 | 130 |
| 129 virtual ~ByteStreamWriter() = 0; | 131 virtual ~ByteStreamWriter() { }; |
| 130 | 132 |
| 131 // Always adds the data passed into the ByteStream. Returns true | 133 // Always adds the data passed into the ByteStream. Returns true |
| 132 // if more data may be added without exceeding the class limit | 134 // if more data may be added without exceeding the class limit |
| 133 // on data. Takes ownership of |buffer|. | 135 // on data. Takes ownership of |buffer|. |
| 134 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | 136 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
| 135 size_t byte_count) = 0; | 137 size_t byte_count) = 0; |
| 136 | 138 |
| 137 // Signal that all data that is going to be sent, has been sent, | 139 // Signal that all data that is going to be sent, has been sent, |
| 138 // and provide a status. |DOWNLOAD_INTERRUPT_REASON_NONE| should be | 140 // and provide a status. |
| 139 // passed for successful completion. | 141 virtual void Close(StatusType status) = 0; |
| 140 virtual void Close(DownloadInterruptReason status) = 0; | |
| 141 | 142 |
| 142 // Register a callback to be called when the stream transitions from | 143 // Register a callback to be called when the stream transitions from |
| 143 // full to having space available. The callback will always be | 144 // full to having space available. The callback will always be |
| 144 // called on the task runner associated with the ByteStreamWriter. | 145 // called on the task runner associated with the ByteStreamWriter. |
| 145 // This callback will only be called if a call to Write has previously | 146 // This callback will only be called if a call to Write has previously |
| 146 // returned false (i.e. the ByteStream has been filled). | 147 // returned false (i.e. the ByteStream has been filled). |
| 147 // Multiple calls to this function are supported, though note that it | 148 // Multiple calls to this function are supported, though note that it |
| 148 // is the callers responsibility to handle races with space becoming | 149 // is the callers responsibility to handle races with space becoming |
| 149 // available (i.e. in the case of that race either of the before | 150 // available (i.e. in the case of that race either of the before |
| 150 // or after callbacks may be called). | 151 // or after callbacks may be called). |
| 151 // The callback will not be called after ByteStreamWriter destruction. | 152 // The callback will not be called after ByteStreamWriter destruction. |
| 152 virtual void RegisterCallback(const base::Closure& source_callback) = 0; | 153 virtual void RegisterCallback(const base::Closure& source_callback) = 0; |
| 153 }; | 154 }; |
| 154 | 155 |
| 155 class CONTENT_EXPORT ByteStreamReader { | 156 template <typename StatusType> |
| 157 class ByteStreamReader { | |
| 156 public: | 158 public: |
| 157 // Inverse of the fraction of the stream buffer that must be empty before | 159 // Inverse of the fraction of the stream buffer that must be empty before |
| 158 // a notification is send to paired Writer that there's more room. | 160 // a notification is send to paired Writer that there's more room. |
| 159 static const int kFractionReadBeforeWindowUpdate; | 161 static const int kFractionReadBeforeWindowUpdate = 3; |
| 160 | 162 |
| 161 enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; | 163 enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; |
| 162 | 164 |
| 163 virtual ~ByteStreamReader() = 0; | 165 virtual ~ByteStreamReader() { }; |
| 164 | 166 |
| 165 // Returns STREAM_EMPTY if there is no data on the ByteStream and | 167 // Returns STREAM_EMPTY if there is no data on the ByteStream and |
| 166 // Close() has not been called, and STREAM_COMPLETE if there | 168 // Close() has not been called, and STREAM_COMPLETE if there |
| 167 // is no data on the ByteStream and Close() has been called. | 169 // is no data on the ByteStream and Close() has been called. |
| 168 // If there is data on the ByteStream, returns STREAM_HAS_DATA | 170 // If there is data on the ByteStream, returns STREAM_HAS_DATA |
| 169 // and fills in |*data| with a pointer to the data, and |*length| | 171 // and fills in |*data| with a pointer to the data, and |*length| |
| 170 // with its length. | 172 // with its length. |
| 171 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | 173 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, |
| 172 size_t* length) = 0; | 174 size_t* length) = 0; |
| 173 | 175 |
| 174 // Only valid to call if Read() has returned STREAM_COMPLETE. | 176 // Only valid to call if Read() has returned STREAM_COMPLETE. |
| 175 virtual DownloadInterruptReason GetStatus() const = 0; | 177 virtual StatusType GetStatus() const = 0; |
| 176 | 178 |
| 177 // Register a callback to be called when data is added or the source | 179 // Register a callback to be called when data is added or the source |
| 178 // completes. The callback will be always be called on the owning | 180 // completes. The callback will be always be called on the owning |
| 179 // task runner. Multiple calls to this function are supported, | 181 // task runner. Multiple calls to this function are supported, |
| 180 // though note that it is the callers responsibility to handle races | 182 // though note that it is the callers responsibility to handle races |
| 181 // with data becoming available (i.e. in the case of that race | 183 // with data becoming available (i.e. in the case of that race |
| 182 // either of the before or after callbacks may be called). | 184 // either of the before or after callbacks may be called). |
| 183 // The callback will not be called after ByteStreamReader destruction. | 185 // The callback will not be called after ByteStreamReader destruction. |
| 184 virtual void RegisterCallback(const base::Closure& sink_callback) = 0; | 186 virtual void RegisterCallback(const base::Closure& sink_callback) = 0; |
| 185 }; | 187 }; |
| 186 | 188 |
| 187 CONTENT_EXPORT void CreateByteStream( | 189 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > |
|
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
I'm surprised that removing CONTENT_EXPORT works;
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
As we have implementation in .h file (to instantia
| |
| 190 ByteStreamContentVector; | |
| 191 | |
| 192 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be | |
| 193 // cleared in an object destructor and accessed to check for object | |
| 194 // existence. We can't use weak pointers because they're tightly tied to | |
| 195 // threads rather than task runners. | |
| 196 // TODO(rdsmith): A better solution would be extending weak pointers | |
| 197 // to support SequencedTaskRunners. | |
| 198 struct ByteStreamLifetimeFlag | |
| 199 : public base::RefCountedThreadSafe<ByteStreamLifetimeFlag> { | |
| 200 public: | |
| 201 ByteStreamLifetimeFlag() : is_alive(true) { } | |
| 202 bool is_alive; | |
| 203 | |
| 204 protected: | |
| 205 friend class base::RefCountedThreadSafe<ByteStreamLifetimeFlag>; | |
| 206 virtual ~ByteStreamLifetimeFlag() { } | |
| 207 | |
| 208 private: | |
| 209 DISALLOW_COPY_AND_ASSIGN(ByteStreamLifetimeFlag); | |
| 210 }; | |
| 211 | |
| 212 template <typename StatusType> | |
| 213 class ByteStreamReaderImpl; | |
| 214 | |
| 215 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and | |
| 216 // SetPeer may happen anywhere; all other operations on each class must | |
| 217 // happen in the context of their SequencedTaskRunner. | |
| 218 template <typename StatusType> | |
| 219 class ByteStreamWriterImpl : public ByteStreamWriter<StatusType> { | |
| 220 public: | |
| 221 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 222 scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, | |
| 223 size_t buffer_size) | |
| 224 : total_buffer_size_(buffer_size), | |
| 225 my_task_runner_(task_runner), | |
| 226 my_lifetime_flag_(lifetime_flag), | |
| 227 input_contents_size_(0), | |
| 228 output_size_used_(0), | |
| 229 peer_(NULL) { | |
| 230 DCHECK(my_lifetime_flag_.get()); | |
| 231 my_lifetime_flag_->is_alive = true; | |
| 232 } | |
| 233 | |
| 234 virtual ~ByteStreamWriterImpl() { | |
| 235 my_lifetime_flag_->is_alive = false; | |
| 236 } | |
| 237 | |
| 238 // Must be called before any operations are performed. | |
| 239 void SetPeer(ByteStreamReaderImpl<StatusType>* peer, | |
| 240 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 241 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { | |
| 242 peer_ = peer; | |
| 243 peer_task_runner_ = peer_task_runner; | |
| 244 peer_lifetime_flag_ = peer_lifetime_flag; | |
| 245 } | |
| 246 | |
| 247 // Overridden from ByteStreamWriter. | |
| 248 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | |
| 249 size_t byte_count) OVERRIDE; | |
| 250 virtual void Close(StatusType status) OVERRIDE; | |
| 251 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE { | |
| 252 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 253 space_available_callback_ = source_callback; | |
| 254 } | |
| 255 | |
| 256 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | |
| 257 static void UpdateWindow(scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, | |
| 258 ByteStreamWriterImpl<StatusType>* target, | |
| 259 size_t bytes_consumed) { | |
| 260 // If the target object isn't alive anymore, we do nothing. | |
| 261 if (!lifetime_flag->is_alive) return; | |
| 262 | |
| 263 target->UpdateWindowInternal(bytes_consumed); | |
| 264 } | |
| 265 | |
| 266 private: | |
| 267 // Called from UpdateWindow when object existence has been validated. | |
| 268 void UpdateWindowInternal(size_t bytes_consumed) { | |
| 269 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 270 DCHECK_GE(output_size_used_, bytes_consumed); | |
| 271 output_size_used_ -= bytes_consumed; | |
| 272 | |
| 273 // Callback if we were above the limit and we're now <= to it. | |
| 274 size_t total_known_size_used = | |
| 275 input_contents_size_ + output_size_used_; | |
| 276 | |
| 277 if (total_known_size_used <= total_buffer_size_ && | |
| 278 (total_known_size_used + bytes_consumed > total_buffer_size_) && | |
| 279 !space_available_callback_.is_null()) | |
| 280 space_available_callback_.Run(); | |
| 281 } | |
| 282 | |
| 283 void DrainInputBuffer(scoped_ptr<ByteStreamContentVector>* buffer) { | |
| 284 if (0 == input_contents_size_) | |
| 285 return; | |
| 286 | |
| 287 buffer->reset(new ByteStreamContentVector); | |
| 288 (*buffer)->swap(input_contents_); | |
| 289 output_size_used_ += input_contents_size_; | |
| 290 input_contents_size_ = 0; | |
| 291 } | |
| 292 | |
| 293 const size_t total_buffer_size_; | |
| 294 | |
| 295 // All data objects in this class are only valid to access on | |
| 296 // this task runner except as otherwise noted. | |
| 297 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
| 298 | |
| 299 // True while this object is alive. | |
| 300 scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; | |
| 301 | |
| 302 base::Closure space_available_callback_; | |
| 303 ByteStreamContentVector input_contents_; | |
| 304 size_t input_contents_size_; | |
| 305 | |
| 306 // ** Peer information. | |
| 307 | |
| 308 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
| 309 | |
| 310 // How much we've sent to the output that for flow control purposes we | |
| 311 // must assume hasn't been read yet. | |
| 312 size_t output_size_used_; | |
| 313 | |
| 314 // Only valid to access on peer_task_runner_. | |
| 315 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; | |
| 316 | |
| 317 // Only valid to access on peer_task_runner_ if | |
| 318 // |*peer_lifetime_flag_ == true| | |
| 319 ByteStreamReaderImpl<StatusType>* peer_; | |
| 320 }; | |
| 321 | |
| 322 template <typename StatusType> | |
| 323 class ByteStreamReaderImpl : public ByteStreamReader<StatusType> { | |
| 324 public: | |
| 325 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 326 scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, | |
| 327 size_t buffer_size) | |
| 328 : total_buffer_size_(buffer_size), | |
| 329 my_task_runner_(task_runner), | |
| 330 my_lifetime_flag_(lifetime_flag), | |
| 331 received_status_(false), | |
| 332 unreported_consumed_bytes_(0), | |
| 333 peer_(NULL) { | |
| 334 DCHECK(my_lifetime_flag_.get()); | |
| 335 my_lifetime_flag_->is_alive = true; | |
| 336 } | |
| 337 | |
| 338 virtual ~ByteStreamReaderImpl() { | |
| 339 my_lifetime_flag_->is_alive = false; | |
| 340 } | |
| 341 | |
| 342 // Must be called before any operations are performed. | |
| 343 void SetPeer(ByteStreamWriterImpl<StatusType>* peer, | |
| 344 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 345 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { | |
| 346 peer_ = peer; | |
| 347 peer_task_runner_ = peer_task_runner; | |
| 348 peer_lifetime_flag_ = peer_lifetime_flag; | |
| 349 } | |
| 350 | |
| 351 // Overridden from ByteStreamReader. | |
| 352 virtual typename ByteStreamReader<StatusType>::StreamState Read( | |
| 353 scoped_refptr<net::IOBuffer>* data, | |
| 354 size_t* length) OVERRIDE { | |
| 355 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 356 | |
| 357 if (available_contents_.size()) { | |
| 358 *data = available_contents_.front().first; | |
| 359 *length = available_contents_.front().second; | |
| 360 available_contents_.pop_front(); | |
| 361 unreported_consumed_bytes_ += *length; | |
| 362 | |
| 363 MaybeUpdateInput(); | |
| 364 return ByteStreamReader<StatusType>::STREAM_HAS_DATA; | |
| 365 } | |
| 366 if (received_status_) { | |
| 367 return ByteStreamReader<StatusType>::STREAM_COMPLETE; | |
| 368 } | |
| 369 return ByteStreamReader<StatusType>::STREAM_EMPTY; | |
| 370 } | |
| 371 virtual StatusType GetStatus() const OVERRIDE { | |
| 372 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 373 DCHECK(received_status_); | |
| 374 return status_; | |
| 375 } | |
| 376 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE { | |
| 377 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 378 | |
| 379 data_available_callback_ = sink_callback; | |
| 380 } | |
| 381 | |
| 382 // PostTask targets from |ByteStreamWriterImpl::Write| and | |
| 383 // |ByteStreamWriterImpl::Close|. | |
| 384 // Receive data from our peer. | |
| 385 // static because it may be called after the object it is targeting | |
| 386 // has been destroyed. It may not access |*target| | |
| 387 // if |*object_lifetime_flag| is false. | |
| 388 static void TransferData( | |
| 389 scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, | |
| 390 ByteStreamReaderImpl<StatusType>* target, | |
| 391 scoped_ptr<ByteStreamContentVector> transfer_buffer) { | |
| 392 // If our target is no longer alive, do nothing. | |
| 393 if (!object_lifetime_flag->is_alive) return; | |
| 394 | |
| 395 target->TransferDataInternal(transfer_buffer.Pass()); | |
| 396 } | |
| 397 static void TransferDataAndClose( | |
| 398 scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, | |
| 399 ByteStreamReaderImpl<StatusType>* target, | |
| 400 scoped_ptr<ByteStreamContentVector> transfer_buffer, | |
| 401 StatusType status) { | |
| 402 // If our target is no longer alive, do nothing. | |
| 403 if (!object_lifetime_flag->is_alive) return; | |
| 404 | |
| 405 target->TransferDataAndCloseInternal(transfer_buffer.Pass(), status); | |
| 406 } | |
| 407 | |
| 408 private: | |
| 409 void AppendBuffer(scoped_ptr<ByteStreamContentVector> buffer) { | |
| 410 if (!buffer) | |
| 411 return; | |
| 412 | |
| 413 available_contents_.insert(available_contents_.end(), | |
| 414 buffer->begin(), | |
| 415 buffer->end()); | |
| 416 } | |
| 417 | |
| 418 void MaybeInvokeCallback(bool was_empty, bool source_complete) { | |
| 419 // Callback on transition from empty to non-empty, or | |
| 420 // source complete. | |
| 421 if (((was_empty && !available_contents_.empty()) || | |
| 422 source_complete) && | |
| 423 !data_available_callback_.is_null()) | |
| 424 data_available_callback_.Run(); | |
| 425 } | |
| 426 | |
| 427 // Called from TransferData.* once object existence has been validated. | |
| 428 void TransferDataInternal( | |
| 429 scoped_ptr<ByteStreamContentVector> transfer_buffer) { | |
| 430 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 431 | |
| 432 bool was_empty = available_contents_.empty(); | |
| 433 | |
| 434 AppendBuffer(transfer_buffer.Pass()); | |
| 435 | |
| 436 MaybeInvokeCallback(was_empty, false /* source_complete */); | |
| 437 } | |
| 438 void TransferDataAndCloseInternal( | |
| 439 scoped_ptr<ByteStreamContentVector> transfer_buffer, StatusType status) { | |
| 440 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 441 | |
| 442 bool was_empty = available_contents_.empty(); | |
| 443 | |
| 444 AppendBuffer(transfer_buffer.Pass()); | |
| 445 | |
| 446 received_status_ = true; | |
| 447 status_ = status; | |
| 448 | |
| 449 MaybeInvokeCallback(was_empty, true /* source_complete */); | |
| 450 } | |
| 451 | |
| 452 void MaybeUpdateInput(); | |
| 453 | |
| 454 const size_t total_buffer_size_; | |
| 455 | |
| 456 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
| 457 | |
| 458 // True while this object is alive. | |
| 459 scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; | |
| 460 | |
| 461 ByteStreamContentVector available_contents_; | |
| 462 | |
| 463 bool received_status_; | |
| 464 StatusType status_; | |
| 465 | |
| 466 base::Closure data_available_callback_; | |
| 467 | |
| 468 // Time of last point at which data in stream transitioned from full | |
| 469 // to non-full. Nulled when a callback is sent. | |
| 470 base::Time last_non_full_time_; | |
| 471 | |
| 472 // ** Peer information | |
| 473 | |
| 474 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
| 475 | |
| 476 // How much has been removed from this class that we haven't told | |
| 477 // the input about yet. | |
| 478 size_t unreported_consumed_bytes_; | |
| 479 | |
| 480 // Only valid to access on peer_task_runner_. | |
| 481 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; | |
| 482 | |
| 483 // Only valid to access on peer_task_runner_ if | |
| 484 // |*peer_lifetime_flag_ == true| | |
| 485 ByteStreamWriterImpl<StatusType>* peer_; | |
| 486 }; | |
| 487 | |
| 488 template <typename StatusType> | |
| 489 bool ByteStreamWriterImpl<StatusType>::Write( | |
| 490 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | |
| 491 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 492 | |
| 493 input_contents_.push_back(std::make_pair(buffer, byte_count)); | |
| 494 input_contents_size_ += byte_count; | |
| 495 | |
| 496 // Arbitrarily, we buffer to a third of the total size before sending. | |
| 497 if (input_contents_size_ > total_buffer_size_ / | |
| 498 ByteStreamWriter<StatusType>::kFractionBufferBeforeSending) { | |
| 499 scoped_ptr<ByteStreamContentVector> transfer_buffer( | |
| 500 new ByteStreamContentVector); | |
| 501 DrainInputBuffer(&transfer_buffer); | |
| 502 | |
| 503 peer_task_runner_->PostTask( | |
| 504 FROM_HERE, base::Bind( | |
| 505 &ByteStreamReaderImpl<StatusType>::TransferData, | |
| 506 peer_lifetime_flag_, | |
| 507 peer_, | |
| 508 base::Passed(&transfer_buffer))); | |
| 509 } | |
| 510 | |
| 511 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | |
| 512 } | |
| 513 | |
| 514 template <typename StatusType> | |
| 515 void ByteStreamWriterImpl<StatusType>::Close(StatusType status) { | |
| 516 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 517 | |
| 518 scoped_ptr<ByteStreamContentVector> transfer_buffer( | |
| 519 new ByteStreamContentVector); | |
| 520 DrainInputBuffer(&transfer_buffer); | |
| 521 | |
| 522 peer_task_runner_->PostTask( | |
| 523 FROM_HERE, base::Bind( | |
| 524 &ByteStreamReaderImpl<StatusType>::TransferDataAndClose, | |
| 525 peer_lifetime_flag_, | |
| 526 peer_, | |
| 527 base::Passed(&transfer_buffer), | |
| 528 status)); | |
| 529 } | |
| 530 | |
| 531 // Decide whether or not to send the input a window update. | |
| 532 // Currently we do that whenever we've got unreported consumption | |
| 533 // greater than 1/3 of total size. | |
| 534 template <typename StatusType> | |
| 535 void ByteStreamReaderImpl<StatusType>::MaybeUpdateInput() { | |
| 536 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 537 | |
| 538 if (unreported_consumed_bytes_ <= | |
| 539 total_buffer_size_ / | |
| 540 ByteStreamReader<StatusType>::kFractionReadBeforeWindowUpdate) | |
| 541 return; | |
| 542 | |
| 543 peer_task_runner_->PostTask( | |
| 544 FROM_HERE, base::Bind( | |
| 545 &ByteStreamWriterImpl<StatusType>::UpdateWindow, | |
| 546 peer_lifetime_flag_, | |
| 547 peer_, | |
| 548 unreported_consumed_bytes_)); | |
| 549 unreported_consumed_bytes_ = 0; | |
| 550 } | |
| 551 | |
| 552 template <typename StatusType> | |
| 553 void CreateByteStream( | |
| 188 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | 554 scoped_refptr<base::SequencedTaskRunner> input_task_runner, |
| 189 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | 555 scoped_refptr<base::SequencedTaskRunner> output_task_runner, |
| 190 size_t buffer_size, | 556 size_t buffer_size, |
| 191 scoped_ptr<ByteStreamWriter>* input, | 557 scoped_ptr<ByteStreamWriter<StatusType> >* input, |
| 192 scoped_ptr<ByteStreamReader>* output); | 558 scoped_ptr<ByteStreamReader<StatusType> >* output) { |
| 559 scoped_refptr<ByteStreamLifetimeFlag> input_flag( | |
| 560 new ByteStreamLifetimeFlag()); | |
| 561 scoped_refptr<ByteStreamLifetimeFlag> output_flag( | |
| 562 new ByteStreamLifetimeFlag()); | |
| 563 | |
| 564 ByteStreamWriterImpl<StatusType>* in = new ByteStreamWriterImpl<StatusType>( | |
| 565 input_task_runner, input_flag, buffer_size); | |
| 566 ByteStreamReaderImpl<StatusType>* out = new ByteStreamReaderImpl<StatusType>( | |
| 567 output_task_runner, output_flag, buffer_size); | |
| 568 | |
| 569 in->SetPeer(out, output_task_runner, output_flag); | |
| 570 out->SetPeer(in, input_task_runner, input_flag); | |
| 571 input->reset(in); | |
| 572 output->reset(out); | |
| 573 } | |
| 193 | 574 |
| 194 } // namespace content | 575 } // namespace content |
| 195 | 576 |
| 196 #endif // CONTENT_BROWSER_BYTE_STREAM_H_ | 577 #endif // CONTENT_BROWSER_BYTE_STREAM_H_ |
| OLD | NEW |