OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef CONTENT_BROWSER_BYTE_STREAM_H_ | 5 #ifndef CONTENT_BROWSER_BYTE_STREAM_H_ |
6 #define CONTENT_BROWSER_BYTE_STREAM_H_ | 6 #define CONTENT_BROWSER_BYTE_STREAM_H_ |
7 | 7 |
8 #include <deque> | 8 #include <deque> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
| 12 #include "base/bind.h" |
12 #include "base/callback.h" | 13 #include "base/callback.h" |
| 14 #include "base/location.h" |
13 #include "base/memory/ref_counted.h" | 15 #include "base/memory/ref_counted.h" |
14 #include "base/synchronization/lock.h" | 16 #include "base/sequenced_task_runner.h" |
15 #include "content/public/browser/download_interrupt_reasons.h" | |
16 #include "net/base/io_buffer.h" | 17 #include "net/base/io_buffer.h" |
17 | 18 |
18 namespace base { | 19 namespace base { |
19 class SequencedTaskRunner; | 20 class SequencedTaskRunner; |
20 } | 21 } |
21 | 22 |
22 namespace content { | 23 namespace content { |
23 | 24 |
24 // A byte stream is a pipe to transfer bytes between a source and a | 25 // A byte stream is a pipe to transfer bytes between a source and a |
25 // sink, which may be on different threads. It is intended to be the | 26 // sink, which may be on different threads. It is intended to be the |
26 // only connection between source and sink; they need have no | 27 // only connection between source and sink; they need have no |
27 // direct awareness of each other aside from the byte stream. The source and | 28 // direct awareness of each other aside from the byte stream. The source and |
28 // the sink have different interfaces to a byte stream, |ByteStreamWriter| | 29 // the sink have different interfaces to a byte stream, |ByteStreamWriter| |
29 // and |ByteStreamReader|. A pair of connected interfaces is generated by | 30 // and |ByteStreamReader|. A pair of connected interfaces is generated by |
30 // calling |CreateByteStream|. | 31 // calling |CreateByteStream|. |
31 // | 32 // |
32 // The source adds bytes to the bytestream via |ByteStreamWriter::Write| | 33 // The source adds bytes to the bytestream via |ByteStreamWriter::Write| |
33 // and the sink retrieves bytes already written via |ByteStreamReader::Read|. | 34 // and the sink retrieves bytes already written via |ByteStreamReader::Read|. |
34 // | 35 // |
35 // When the source has no more data to add, it will call | 36 // When the source has no more data to add, it will call |
36 // |ByteStreamWriter::Close| to indicate that. Errors at the source | 37 // |ByteStreamWriter::Close| to indicate that. Operation status at the source |
37 // are indicated to the sink via a non-DOWNLOAD_INTERRUPT_REASON_NONE code. | 38 // are indicated to the sink via the GetStatus() method. |
| 39 // |
| 40 // The type of operation status is specified as the StatusType template |
| 41 // parameter of these classes. StatusType must have a 0-arg constructor and be |
| 42 // copyable and assignable. |
38 // | 43 // |
39 // Normally the source is not managed after the relationship is setup; | 44 // Normally the source is not managed after the relationship is setup; |
40 // it is expected to provide data and then close itself. If an error | 45 // it is expected to provide data and then close itself. If an error |
41 // occurs on the sink, it is not signalled to the source via this | 46 // occurs on the sink, it is not signalled to the source via this |
42 // mechanism; instead, the source will write data until it exausts the | 47 // mechanism; instead, the source will write data until it exausts the |
43 // available space. If the source needs to be aware of errors occuring | 48 // available space. If the source needs to be aware of errors occuring |
44 // on the sink, this must be signalled in some other fashion (usually | 49 // on the sink, this must be signalled in some other fashion (usually |
45 // through whatever controller setup the relationship). | 50 // through whatever controller setup the relationship). |
46 // | 51 // |
47 // Callback lifetime management: No lifetime management is done in this | 52 // Callback lifetime management: No lifetime management is done in this |
48 // class to prevent registered callbacks from being called after any | 53 // class to prevent registered callbacks from being called after any |
49 // objects to which they may refer have been destroyed. It is the | 54 // objects to which they may refer have been destroyed. It is the |
50 // responsibility of the callers to avoid use-after-free references. | 55 // responsibility of the callers to avoid use-after-free references. |
51 // This may be done by any of several mechanisms, including weak | 56 // This may be done by any of several mechanisms, including weak |
52 // pointers, scoped_refptr references, or calling the registration | 57 // pointers, scoped_refptr references, or calling the registration |
53 // function with a null callback from a destructor. To enable the null | 58 // function with a null callback from a destructor. To enable the null |
54 // callback strategy, callbacks will not be stored between retrieval and | 59 // callback strategy, callbacks will not be stored between retrieval and |
55 // evaluation, so setting a null callback will guarantee that the | 60 // evaluation, so setting a null callback will guarantee that the |
56 // previous callback will not be executed after setting. | 61 // previous callback will not be executed after setting. |
57 // | 62 // |
58 // Class methods are virtual to allow mocking for tests; these classes | 63 // Class methods are virtual to allow mocking for tests; these classes |
59 // aren't intended to be base classes for other classes. | 64 // aren't intended to be base classes for other classes. |
60 // | 65 // |
61 // Sample usage (note that this does not show callback usage): | 66 // Sample usage (note that this does not show callback usage): |
62 // | 67 // |
63 // void OriginatingClass::Initialize() { | 68 // void OriginatingClass::Initialize() { |
64 // // Create a stream for sending bytes from IO->FILE threads. | 69 // // Create a stream for sending bytes from IO->FILE threads. |
65 // scoped_ptr<ByteStreamWriter> writer; | 70 // scoped_ptr<ByteStreamWriter<StatusType> > writer; |
66 // scoped_ptr<ByteStreamReader> reader; | 71 // scoped_ptr<ByteStreamReader<StatusType> > reader; |
67 // CreateByteStream( | 72 // CreateByteStream<StatusType>( |
68 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), | 73 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), |
69 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), | 74 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), |
70 // kStreamBufferSize /* e.g. 10240. */, | 75 // kStreamBufferSize /* e.g. 10240. */, |
71 // &writer, | 76 // &writer, |
72 // &reader); // Presumed passed to FILE thread for reading. | 77 // &reader); // Presumed passed to FILE thread for reading. |
73 // | 78 // |
74 // // Setup callback for writing. | 79 // // Setup callback for writing. |
75 // writer->RegisterCallback(base::Bind(&SpaceAvailable, this)); | 80 // writer->RegisterCallback(base::Bind(&SpaceAvailable, this)); |
76 // | 81 // |
77 // // Do initial round of writing. | 82 // // Do initial round of writing. |
(...skipping 28 matching lines...) Expand all Loading... |
106 // void ReceivingClass::DataAvailable() { | 111 // void ReceivingClass::DataAvailable() { |
107 // scoped_refptr<net::IOBuffer> data; | 112 // scoped_refptr<net::IOBuffer> data; |
108 // size_t length = 0; | 113 // size_t length = 0; |
109 // | 114 // |
110 // while (ByteStreamReader::STREAM_HAS_DATA == | 115 // while (ByteStreamReader::STREAM_HAS_DATA == |
111 // (state = reader->Read(&data, &length))) { | 116 // (state = reader->Read(&data, &length))) { |
112 // // Process |data|. | 117 // // Process |data|. |
113 // } | 118 // } |
114 // | 119 // |
115 // if (ByteStreamReader::STREAM_COMPLETE == state) { | 120 // if (ByteStreamReader::STREAM_COMPLETE == state) { |
116 // DownloadInterruptReason status = reader->GetStatus(); | 121 // <StatusType> status = reader->GetStatus(); |
117 // // Process error or successful completion in |status|. | 122 // // Process error or successful completion in |status|. |
118 // } | 123 // } |
119 // | 124 // |
120 // // if |state| is STREAM_EMPTY, we're done for now; we'll be called | 125 // // if |state| is STREAM_EMPTY, we're done for now; we'll be called |
121 // // again when there's more data. | 126 // // again when there's more data. |
122 // } | 127 // } |
123 class CONTENT_EXPORT ByteStreamWriter { | 128 template <typename StatusType> |
124 public: | 129 class ByteStreamWriter { |
| 130 public: |
125 // Inverse of the fraction of the stream buffer that must be full before | 131 // Inverse of the fraction of the stream buffer that must be full before |
126 // a notification is sent to paired Reader that there's more data. | 132 // a notification is sent to paired Reader that there's more data. |
127 static const int kFractionBufferBeforeSending; | 133 static const int kFractionBufferBeforeSending = 3; |
128 | 134 |
129 virtual ~ByteStreamWriter() = 0; | 135 virtual ~ByteStreamWriter() { }; |
130 | 136 |
131 // Always adds the data passed into the ByteStream. Returns true | 137 // Always adds the data passed into the ByteStream. Returns true |
132 // if more data may be added without exceeding the class limit | 138 // if more data may be added without exceeding the class limit |
133 // on data. Takes ownership of |buffer|. | 139 // on data. Takes ownership of |buffer|. |
134 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | 140 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
135 size_t byte_count) = 0; | 141 size_t byte_count) = 0; |
136 | 142 |
137 // Signal that all data that is going to be sent, has been sent, | 143 // Signal that all data that is going to be sent, has been sent, |
138 // and provide a status. |DOWNLOAD_INTERRUPT_REASON_NONE| should be | 144 // and provide a status. |
139 // passed for successful completion. | 145 virtual void Close(StatusType status) = 0; |
140 virtual void Close(DownloadInterruptReason status) = 0; | |
141 | 146 |
142 // Register a callback to be called when the stream transitions from | 147 // Register a callback to be called when the stream transitions from |
143 // full to having space available. The callback will always be | 148 // full to having space available. The callback will always be |
144 // called on the task runner associated with the ByteStreamWriter. | 149 // called on the task runner associated with the ByteStreamWriter. |
145 // This callback will only be called if a call to Write has previously | 150 // This callback will only be called if a call to Write has previously |
146 // returned false (i.e. the ByteStream has been filled). | 151 // returned false (i.e. the ByteStream has been filled). |
147 // Multiple calls to this function are supported, though note that it | 152 // Multiple calls to this function are supported, though note that it |
148 // is the callers responsibility to handle races with space becoming | 153 // is the callers responsibility to handle races with space becoming |
149 // available (i.e. in the case of that race either of the before | 154 // available (i.e. in the case of that race either of the before |
150 // or after callbacks may be called). | 155 // or after callbacks may be called). |
151 // The callback will not be called after ByteStreamWriter destruction. | 156 // The callback will not be called after ByteStreamWriter destruction. |
152 virtual void RegisterCallback(const base::Closure& source_callback) = 0; | 157 virtual void RegisterCallback(const base::Closure& source_callback) = 0; |
153 }; | 158 }; |
154 | 159 |
155 class CONTENT_EXPORT ByteStreamReader { | 160 template <typename StatusType> |
| 161 class ByteStreamReader { |
156 public: | 162 public: |
157 // Inverse of the fraction of the stream buffer that must be empty before | 163 // Inverse of the fraction of the stream buffer that must be empty before |
158 // a notification is send to paired Writer that there's more room. | 164 // a notification is send to paired Writer that there's more room. |
159 static const int kFractionReadBeforeWindowUpdate; | 165 static const int kFractionReadBeforeWindowUpdate = 3; |
160 | 166 |
161 enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; | 167 enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; |
162 | 168 |
163 virtual ~ByteStreamReader() = 0; | 169 virtual ~ByteStreamReader() { }; |
164 | 170 |
165 // Returns STREAM_EMPTY if there is no data on the ByteStream and | 171 // Returns STREAM_EMPTY if there is no data on the ByteStream and |
166 // Close() has not been called, and STREAM_COMPLETE if there | 172 // Close() has not been called, and STREAM_COMPLETE if there |
167 // is no data on the ByteStream and Close() has been called. | 173 // is no data on the ByteStream and Close() has been called. |
168 // If there is data on the ByteStream, returns STREAM_HAS_DATA | 174 // If there is data on the ByteStream, returns STREAM_HAS_DATA |
169 // and fills in |*data| with a pointer to the data, and |*length| | 175 // and fills in |*data| with a pointer to the data, and |*length| |
170 // with its length. | 176 // with its length. |
171 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | 177 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, |
172 size_t* length) = 0; | 178 size_t* length) = 0; |
173 | 179 |
174 // Only valid to call if Read() has returned STREAM_COMPLETE. | 180 // Only valid to call if Read() has returned STREAM_COMPLETE. |
175 virtual DownloadInterruptReason GetStatus() const = 0; | 181 virtual StatusType GetStatus() const = 0; |
176 | 182 |
177 // Register a callback to be called when data is added or the source | 183 // Register a callback to be called when data is added or the source |
178 // completes. The callback will be always be called on the owning | 184 // completes. The callback will be always be called on the owning |
179 // task runner. Multiple calls to this function are supported, | 185 // task runner. Multiple calls to this function are supported, |
180 // though note that it is the callers responsibility to handle races | 186 // though note that it is the callers responsibility to handle races |
181 // with data becoming available (i.e. in the case of that race | 187 // with data becoming available (i.e. in the case of that race |
182 // either of the before or after callbacks may be called). | 188 // either of the before or after callbacks may be called). |
183 // The callback will not be called after ByteStreamReader destruction. | 189 // The callback will not be called after ByteStreamReader destruction. |
184 virtual void RegisterCallback(const base::Closure& sink_callback) = 0; | 190 virtual void RegisterCallback(const base::Closure& sink_callback) = 0; |
185 }; | 191 }; |
186 | 192 |
187 CONTENT_EXPORT void CreateByteStream( | 193 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > |
| 194 ByteStreamContentVector; |
| 195 |
| 196 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be |
| 197 // cleared in an object destructor and accessed to check for object |
| 198 // existence. We can't use weak pointers because they're tightly tied to |
| 199 // threads rather than task runners. |
| 200 // TODO(rdsmith): A better solution would be extending weak pointers |
| 201 // to support SequencedTaskRunners. |
| 202 struct ByteStreamLifetimeFlag |
| 203 : public base::RefCountedThreadSafe<ByteStreamLifetimeFlag> { |
| 204 public: |
| 205 ByteStreamLifetimeFlag() : is_alive(true) { } |
| 206 bool is_alive; |
| 207 |
| 208 protected: |
| 209 friend class base::RefCountedThreadSafe<ByteStreamLifetimeFlag>; |
| 210 virtual ~ByteStreamLifetimeFlag() { } |
| 211 |
| 212 private: |
| 213 DISALLOW_COPY_AND_ASSIGN(ByteStreamLifetimeFlag); |
| 214 }; |
| 215 |
| 216 template <typename StatusType> |
| 217 class ByteStreamReaderImpl; |
| 218 |
| 219 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and |
| 220 // SetPeer may happen anywhere; all other operations on each class must |
| 221 // happen in the context of their SequencedTaskRunner. |
| 222 template <typename StatusType> |
| 223 class ByteStreamWriterImpl : public ByteStreamWriter<StatusType> { |
| 224 public: |
| 225 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
| 226 scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
| 227 size_t buffer_size) |
| 228 : total_buffer_size_(buffer_size), |
| 229 my_task_runner_(task_runner), |
| 230 my_lifetime_flag_(lifetime_flag), |
| 231 input_contents_size_(0), |
| 232 output_size_used_(0), |
| 233 peer_(NULL) { |
| 234 DCHECK(my_lifetime_flag_.get()); |
| 235 my_lifetime_flag_->is_alive = true; |
| 236 } |
| 237 |
| 238 virtual ~ByteStreamWriterImpl() { |
| 239 my_lifetime_flag_->is_alive = false; |
| 240 } |
| 241 |
| 242 // Must be called before any operations are performed. |
| 243 void SetPeer(ByteStreamReaderImpl<StatusType>* peer, |
| 244 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| 245 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { |
| 246 peer_ = peer; |
| 247 peer_task_runner_ = peer_task_runner; |
| 248 peer_lifetime_flag_ = peer_lifetime_flag; |
| 249 } |
| 250 |
| 251 // Overridden from ByteStreamWriter. |
| 252 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
| 253 size_t byte_count) OVERRIDE; |
| 254 virtual void Close(StatusType status) OVERRIDE; |
| 255 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE { |
| 256 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 257 space_available_callback_ = source_callback; |
| 258 } |
| 259 |
| 260 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. |
| 261 static void UpdateWindow(scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
| 262 ByteStreamWriterImpl<StatusType>* target, |
| 263 size_t bytes_consumed) { |
| 264 // If the target object isn't alive anymore, we do nothing. |
| 265 if (!lifetime_flag->is_alive) return; |
| 266 |
| 267 target->UpdateWindowInternal(bytes_consumed); |
| 268 } |
| 269 |
| 270 private: |
| 271 // Called from UpdateWindow when object existence has been validated. |
| 272 void UpdateWindowInternal(size_t bytes_consumed) { |
| 273 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 274 DCHECK_GE(output_size_used_, bytes_consumed); |
| 275 output_size_used_ -= bytes_consumed; |
| 276 |
| 277 // Callback if we were above the limit and we're now <= to it. |
| 278 size_t total_known_size_used = |
| 279 input_contents_size_ + output_size_used_; |
| 280 |
| 281 if (total_known_size_used <= total_buffer_size_ && |
| 282 (total_known_size_used + bytes_consumed > total_buffer_size_) && |
| 283 !space_available_callback_.is_null()) |
| 284 space_available_callback_.Run(); |
| 285 } |
| 286 |
| 287 void DrainInputBuffer(scoped_ptr<ByteStreamContentVector>* buffer) { |
| 288 if (0 == input_contents_size_) |
| 289 return; |
| 290 |
| 291 buffer->reset(new ByteStreamContentVector); |
| 292 (*buffer)->swap(input_contents_); |
| 293 output_size_used_ += input_contents_size_; |
| 294 input_contents_size_ = 0; |
| 295 } |
| 296 |
| 297 const size_t total_buffer_size_; |
| 298 |
| 299 // All data objects in this class are only valid to access on |
| 300 // this task runner except as otherwise noted. |
| 301 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| 302 |
| 303 // True while this object is alive. |
| 304 scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; |
| 305 |
| 306 base::Closure space_available_callback_; |
| 307 ByteStreamContentVector input_contents_; |
| 308 size_t input_contents_size_; |
| 309 |
| 310 // ** Peer information. |
| 311 |
| 312 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
| 313 |
| 314 // How much we've sent to the output that for flow control purposes we |
| 315 // must assume hasn't been read yet. |
| 316 size_t output_size_used_; |
| 317 |
| 318 // Only valid to access on peer_task_runner_. |
| 319 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; |
| 320 |
| 321 // Only valid to access on peer_task_runner_ if |
| 322 // |*peer_lifetime_flag_ == true| |
| 323 ByteStreamReaderImpl<StatusType>* peer_; |
| 324 }; |
| 325 |
| 326 template <typename StatusType> |
| 327 class ByteStreamReaderImpl : public ByteStreamReader<StatusType> { |
| 328 public: |
| 329 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
| 330 scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, |
| 331 size_t buffer_size) |
| 332 : total_buffer_size_(buffer_size), |
| 333 my_task_runner_(task_runner), |
| 334 my_lifetime_flag_(lifetime_flag), |
| 335 received_status_(false), |
| 336 unreported_consumed_bytes_(0), |
| 337 peer_(NULL) { |
| 338 DCHECK(my_lifetime_flag_.get()); |
| 339 my_lifetime_flag_->is_alive = true; |
| 340 } |
| 341 |
| 342 virtual ~ByteStreamReaderImpl() { |
| 343 my_lifetime_flag_->is_alive = false; |
| 344 } |
| 345 |
| 346 // Must be called before any operations are performed. |
| 347 void SetPeer(ByteStreamWriterImpl<StatusType>* peer, |
| 348 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| 349 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { |
| 350 peer_ = peer; |
| 351 peer_task_runner_ = peer_task_runner; |
| 352 peer_lifetime_flag_ = peer_lifetime_flag; |
| 353 } |
| 354 |
| 355 // Overridden from ByteStreamReader. |
| 356 virtual typename ByteStreamReader<StatusType>::StreamState Read( |
| 357 scoped_refptr<net::IOBuffer>* data, |
| 358 size_t* length) OVERRIDE { |
| 359 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 360 |
| 361 if (available_contents_.size()) { |
| 362 *data = available_contents_.front().first; |
| 363 *length = available_contents_.front().second; |
| 364 available_contents_.pop_front(); |
| 365 unreported_consumed_bytes_ += *length; |
| 366 |
| 367 MaybeUpdateInput(); |
| 368 return ByteStreamReader<StatusType>::STREAM_HAS_DATA; |
| 369 } |
| 370 if (received_status_) { |
| 371 return ByteStreamReader<StatusType>::STREAM_COMPLETE; |
| 372 } |
| 373 return ByteStreamReader<StatusType>::STREAM_EMPTY; |
| 374 } |
| 375 virtual StatusType GetStatus() const OVERRIDE { |
| 376 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 377 DCHECK(received_status_); |
| 378 return status_; |
| 379 } |
| 380 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE { |
| 381 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 382 |
| 383 data_available_callback_ = sink_callback; |
| 384 } |
| 385 |
| 386 // PostTask targets from |ByteStreamWriterImpl::Write| and |
| 387 // |ByteStreamWriterImpl::Close|. |
| 388 // Receive data from our peer. |
| 389 // static because it may be called after the object it is targeting |
| 390 // has been destroyed. It may not access |*target| |
| 391 // if |*object_lifetime_flag| is false. |
| 392 static void TransferData( |
| 393 scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, |
| 394 ByteStreamReaderImpl<StatusType>* target, |
| 395 scoped_ptr<ByteStreamContentVector> transfer_buffer) { |
| 396 // If our target is no longer alive, do nothing. |
| 397 if (!object_lifetime_flag->is_alive) return; |
| 398 |
| 399 target->TransferDataInternal(transfer_buffer.Pass()); |
| 400 } |
| 401 static void TransferDataAndClose( |
| 402 scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, |
| 403 ByteStreamReaderImpl<StatusType>* target, |
| 404 scoped_ptr<ByteStreamContentVector> transfer_buffer, |
| 405 StatusType status) { |
| 406 // If our target is no longer alive, do nothing. |
| 407 if (!object_lifetime_flag->is_alive) return; |
| 408 |
| 409 target->TransferDataAndCloseInternal(transfer_buffer.Pass(), status); |
| 410 } |
| 411 |
| 412 private: |
| 413 void AppendBuffer(scoped_ptr<ByteStreamContentVector> buffer) { |
| 414 if (!buffer) |
| 415 return; |
| 416 |
| 417 available_contents_.insert(available_contents_.end(), |
| 418 buffer->begin(), |
| 419 buffer->end()); |
| 420 } |
| 421 |
| 422 void MaybeInvokeCallback(bool was_empty, bool source_complete) { |
| 423 // Callback on transition from empty to non-empty, or |
| 424 // source complete. |
| 425 if (((was_empty && !available_contents_.empty()) || |
| 426 source_complete) && |
| 427 !data_available_callback_.is_null()) |
| 428 data_available_callback_.Run(); |
| 429 } |
| 430 |
| 431 // Called from TransferData.* once object existence has been validated. |
| 432 void TransferDataInternal( |
| 433 scoped_ptr<ByteStreamContentVector> transfer_buffer) { |
| 434 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 435 |
| 436 bool was_empty = available_contents_.empty(); |
| 437 |
| 438 AppendBuffer(transfer_buffer.Pass()); |
| 439 |
| 440 MaybeInvokeCallback(was_empty, false /* source_complete */); |
| 441 } |
| 442 void TransferDataAndCloseInternal( |
| 443 scoped_ptr<ByteStreamContentVector> transfer_buffer, StatusType status) { |
| 444 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 445 |
| 446 bool was_empty = available_contents_.empty(); |
| 447 |
| 448 AppendBuffer(transfer_buffer.Pass()); |
| 449 |
| 450 received_status_ = true; |
| 451 status_ = status; |
| 452 |
| 453 MaybeInvokeCallback(was_empty, true /* source_complete */); |
| 454 } |
| 455 |
| 456 void MaybeUpdateInput(); |
| 457 |
| 458 const size_t total_buffer_size_; |
| 459 |
| 460 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| 461 |
| 462 // True while this object is alive. |
| 463 scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; |
| 464 |
| 465 ByteStreamContentVector available_contents_; |
| 466 |
| 467 bool received_status_; |
| 468 StatusType status_; |
| 469 |
| 470 base::Closure data_available_callback_; |
| 471 |
| 472 // Time of last point at which data in stream transitioned from full |
| 473 // to non-full. Nulled when a callback is sent. |
| 474 base::Time last_non_full_time_; |
| 475 |
| 476 // ** Peer information |
| 477 |
| 478 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
| 479 |
| 480 // How much has been removed from this class that we haven't told |
| 481 // the input about yet. |
| 482 size_t unreported_consumed_bytes_; |
| 483 |
| 484 // Only valid to access on peer_task_runner_. |
| 485 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; |
| 486 |
| 487 // Only valid to access on peer_task_runner_ if |
| 488 // |*peer_lifetime_flag_ == true| |
| 489 ByteStreamWriterImpl<StatusType>* peer_; |
| 490 }; |
| 491 |
| 492 template <typename StatusType> |
| 493 bool ByteStreamWriterImpl<StatusType>::Write( |
| 494 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
| 495 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 496 |
| 497 input_contents_.push_back(std::make_pair(buffer, byte_count)); |
| 498 input_contents_size_ += byte_count; |
| 499 |
| 500 // Arbitrarily, we buffer to a third of the total size before sending. |
| 501 if (input_contents_size_ > total_buffer_size_ / |
| 502 ByteStreamWriter<StatusType>::kFractionBufferBeforeSending) { |
| 503 scoped_ptr<ByteStreamContentVector> transfer_buffer( |
| 504 new ByteStreamContentVector); |
| 505 DrainInputBuffer(&transfer_buffer); |
| 506 |
| 507 peer_task_runner_->PostTask( |
| 508 FROM_HERE, base::Bind( |
| 509 &ByteStreamReaderImpl<StatusType>::TransferData, |
| 510 peer_lifetime_flag_, |
| 511 peer_, |
| 512 base::Passed(&transfer_buffer))); |
| 513 } |
| 514 |
| 515 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); |
| 516 } |
| 517 |
| 518 template <typename StatusType> |
| 519 void ByteStreamWriterImpl<StatusType>::Close(StatusType status) { |
| 520 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 521 |
| 522 scoped_ptr<ByteStreamContentVector> transfer_buffer( |
| 523 new ByteStreamContentVector); |
| 524 DrainInputBuffer(&transfer_buffer); |
| 525 |
| 526 peer_task_runner_->PostTask( |
| 527 FROM_HERE, base::Bind( |
| 528 &ByteStreamReaderImpl<StatusType>::TransferDataAndClose, |
| 529 peer_lifetime_flag_, |
| 530 peer_, |
| 531 base::Passed(&transfer_buffer), |
| 532 status)); |
| 533 } |
| 534 |
| 535 // Decide whether or not to send the input a window update. |
| 536 // Currently we do that whenever we've got unreported consumption |
| 537 // greater than 1/3 of total size. |
| 538 template <typename StatusType> |
| 539 void ByteStreamReaderImpl<StatusType>::MaybeUpdateInput() { |
| 540 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 541 |
| 542 if (unreported_consumed_bytes_ <= |
| 543 total_buffer_size_ / |
| 544 ByteStreamReader<StatusType>::kFractionReadBeforeWindowUpdate) |
| 545 return; |
| 546 |
| 547 peer_task_runner_->PostTask( |
| 548 FROM_HERE, base::Bind( |
| 549 &ByteStreamWriterImpl<StatusType>::UpdateWindow, |
| 550 peer_lifetime_flag_, |
| 551 peer_, |
| 552 unreported_consumed_bytes_)); |
| 553 unreported_consumed_bytes_ = 0; |
| 554 } |
| 555 |
| 556 template <typename StatusType> |
| 557 void CreateByteStream( |
188 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | 558 scoped_refptr<base::SequencedTaskRunner> input_task_runner, |
189 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | 559 scoped_refptr<base::SequencedTaskRunner> output_task_runner, |
190 size_t buffer_size, | 560 size_t buffer_size, |
191 scoped_ptr<ByteStreamWriter>* input, | 561 scoped_ptr<ByteStreamWriter<StatusType> >* input, |
192 scoped_ptr<ByteStreamReader>* output); | 562 scoped_ptr<ByteStreamReader<StatusType> >* output) { |
| 563 scoped_refptr<ByteStreamLifetimeFlag> input_flag( |
| 564 new ByteStreamLifetimeFlag()); |
| 565 scoped_refptr<ByteStreamLifetimeFlag> output_flag( |
| 566 new ByteStreamLifetimeFlag()); |
| 567 |
| 568 ByteStreamWriterImpl<StatusType>* in = new ByteStreamWriterImpl<StatusType>( |
| 569 input_task_runner, input_flag, buffer_size); |
| 570 ByteStreamReaderImpl<StatusType>* out = new ByteStreamReaderImpl<StatusType>( |
| 571 output_task_runner, output_flag, buffer_size); |
| 572 |
| 573 in->SetPeer(out, output_task_runner, output_flag); |
| 574 out->SetPeer(in, input_task_runner, input_flag); |
| 575 input->reset(in); |
| 576 output->reset(out); |
| 577 } |
193 | 578 |
194 } // namespace content | 579 } // namespace content |
195 | 580 |
196 #endif // CONTENT_BROWSER_BYTE_STREAM_H_ | 581 #endif // CONTENT_BROWSER_BYTE_STREAM_H_ |
OLD | NEW |