OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef CONTENT_BROWSER_BYTE_STREAM_H_ | 5 #ifndef CONTENT_BROWSER_BYTE_STREAM_H_ |
6 #define CONTENT_BROWSER_BYTE_STREAM_H_ | 6 #define CONTENT_BROWSER_BYTE_STREAM_H_ |
7 | 7 |
8 #include <deque> | 8 #include <deque> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | |
12 #include "base/callback.h" | 13 #include "base/callback.h" |
14 #include "base/location.h" | |
13 #include "base/memory/ref_counted.h" | 15 #include "base/memory/ref_counted.h" |
14 #include "base/synchronization/lock.h" | 16 #include "base/sequenced_task_runner.h" |
15 #include "content/public/browser/download_interrupt_reasons.h" | |
16 #include "net/base/io_buffer.h" | 17 #include "net/base/io_buffer.h" |
17 | 18 |
18 namespace base { | 19 namespace base { |
19 class SequencedTaskRunner; | 20 class SequencedTaskRunner; |
20 } | 21 } |
21 | 22 |
22 namespace content { | 23 namespace content { |
23 | 24 |
24 // A byte stream is a pipe to transfer bytes between a source and a | 25 // A byte stream is a pipe to transfer bytes between a source and a |
25 // sink, which may be on different threads. It is intended to be the | 26 // sink, which may be on different threads. It is intended to be the |
26 // only connection between source and sink; they need have no | 27 // only connection between source and sink; they need have no |
27 // direct awareness of each other aside from the byte stream. The source and | 28 // direct awareness of each other aside from the byte stream. The source and |
28 // the sink have different interfaces to a byte stream, |ByteStreamWriter| | 29 // the sink have different interfaces to a byte stream, |ByteStreamWriter| |
29 // and |ByteStreamReader|. A pair of connected interfaces is generated by | 30 // and |ByteStreamReader|. A pair of connected interfaces is generated by |
30 // calling |CreateByteStream|. | 31 // calling |CreateByteStream|. |
31 // | 32 // |
32 // The source adds bytes to the bytestream via |ByteStreamWriter::Write| | 33 // The source adds bytes to the bytestream via |ByteStreamWriter::Write| |
33 // and the sink retrieves bytes already written via |ByteStreamReader::Read|. | 34 // and the sink retrieves bytes already written via |ByteStreamReader::Read|. |
34 // | 35 // |
35 // When the source has no more data to add, it will call | 36 // When the source has no more data to add, it will call |
36 // |ByteStreamWriter::Close| to indicate that. Errors at the source | 37 // |ByteStreamWriter::Close| to indicate that. Errors at the source |
37 // are indicated to the sink via a non-DOWNLOAD_INTERRUPT_REASON_NONE code. | 38 // are indicated to the sink via the GetStatus() method. |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Suggestion: Change "Errors" to "Operation status";
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
| |
38 // | 39 // |
39 // Normally the source is not managed after the relationship is setup; | 40 // Normally the source is not managed after the relationship is setup; |
40 // it is expected to provide data and then close itself. If an error | 41 // it is expected to provide data and then close itself. If an error |
41 // occurs on the sink, it is not signalled to the source via this | 42 // occurs on the sink, it is not signalled to the source via this |
42 // mechanism; instead, the source will write data until it exausts the | 43 // mechanism; instead, the source will write data until it exausts the |
43 // available space. If the source needs to be aware of errors occuring | 44 // available space. If the source needs to be aware of errors occuring |
44 // on the sink, this must be signalled in some other fashion (usually | 45 // on the sink, this must be signalled in some other fashion (usually |
45 // through whatever controller setup the relationship). | 46 // through whatever controller setup the relationship). |
46 // | 47 // |
47 // Callback lifetime management: No lifetime management is done in this | 48 // Callback lifetime management: No lifetime management is done in this |
48 // class to prevent registered callbacks from being called after any | 49 // class to prevent registered callbacks from being called after any |
49 // objects to which they may refer have been destroyed. It is the | 50 // objects to which they may refer have been destroyed. It is the |
50 // responsibility of the callers to avoid use-after-free references. | 51 // responsibility of the callers to avoid use-after-free references. |
51 // This may be done by any of several mechanisms, including weak | 52 // This may be done by any of several mechanisms, including weak |
52 // pointers, scoped_refptr references, or calling the registration | 53 // pointers, scoped_refptr references, or calling the registration |
53 // function with a null callback from a destructor. To enable the null | 54 // function with a null callback from a destructor. To enable the null |
54 // callback strategy, callbacks will not be stored between retrieval and | 55 // callback strategy, callbacks will not be stored between retrieval and |
55 // evaluation, so setting a null callback will guarantee that the | 56 // evaluation, so setting a null callback will guarantee that the |
56 // previous callback will not be executed after setting. | 57 // previous callback will not be executed after setting. |
57 // | 58 // |
58 // Class methods are virtual to allow mocking for tests; these classes | 59 // Class methods are virtual to allow mocking for tests; these classes |
59 // aren't intended to be base classes for other classes. | 60 // aren't intended to be base classes for other classes. |
60 // | 61 // |
61 // Sample usage (note that this does not show callback usage): | 62 // Sample usage (note that this does not show callback usage): |
62 // | 63 // |
63 // void OriginatingClass::Initialize() { | 64 // void OriginatingClass::Initialize() { |
64 // // Create a stream for sending bytes from IO->FILE threads. | 65 // // Create a stream for sending bytes from IO->FILE threads. |
65 // scoped_ptr<ByteStreamWriter> writer; | 66 // scoped_ptr<ByteStreamWriter<status type> > writer; |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
Could you change this to StatusType? My eye keeps
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Done.
| |
66 // scoped_ptr<ByteStreamReader> reader; | 67 // scoped_ptr<ByteStreamReader<status type> > reader; |
67 // CreateByteStream( | 68 // CreateByteStream<status type>( |
68 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), | 69 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO), |
69 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), | 70 // BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE), |
70 // kStreamBufferSize /* e.g. 10240. */, | 71 // kStreamBufferSize /* e.g. 10240. */, |
71 // &writer, | 72 // &writer, |
72 // &reader); // Presumed passed to FILE thread for reading. | 73 // &reader); // Presumed passed to FILE thread for reading. |
73 // | 74 // |
74 // // Setup callback for writing. | 75 // // Setup callback for writing. |
75 // writer->RegisterCallback(base::Bind(&SpaceAvailable, this)); | 76 // writer->RegisterCallback(base::Bind(&SpaceAvailable, this)); |
76 // | 77 // |
77 // // Do initial round of writing. | 78 // // Do initial round of writing. |
(...skipping 28 matching lines...) Expand all Loading... | |
106 // void ReceivingClass::DataAvailable() { | 107 // void ReceivingClass::DataAvailable() { |
107 // scoped_refptr<net::IOBuffer> data; | 108 // scoped_refptr<net::IOBuffer> data; |
108 // size_t length = 0; | 109 // size_t length = 0; |
109 // | 110 // |
110 // while (ByteStreamReader::STREAM_HAS_DATA == | 111 // while (ByteStreamReader::STREAM_HAS_DATA == |
111 // (state = reader->Read(&data, &length))) { | 112 // (state = reader->Read(&data, &length))) { |
112 // // Process |data|. | 113 // // Process |data|. |
113 // } | 114 // } |
114 // | 115 // |
115 // if (ByteStreamReader::STREAM_COMPLETE == state) { | 116 // if (ByteStreamReader::STREAM_COMPLETE == state) { |
116 // DownloadInterruptReason status = reader->GetStatus(); | 117 // <status type> status = reader->GetStatus(); |
117 // // Process error or successful completion in |status|. | 118 // // Process error or successful completion in |status|. |
118 // } | 119 // } |
119 // | 120 // |
120 // // if |state| is STREAM_EMPTY, we're done for now; we'll be called | 121 // // if |state| is STREAM_EMPTY, we're done for now; we'll be called |
121 // // again when there's more data. | 122 // // again when there's more data. |
122 // } | 123 // } |
123 class CONTENT_EXPORT ByteStreamWriter { | 124 template <typename StatusType> |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
We need to document the requirements on StatusType
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
Added around L38
| |
124 public: | 125 class ByteStreamWriter { |
126 public: | |
125 // Inverse of the fraction of the stream buffer that must be full before | 127 // Inverse of the fraction of the stream buffer that must be full before |
126 // a notification is sent to paired Reader that there's more data. | 128 // a notification is sent to paired Reader that there's more data. |
127 static const int kFractionBufferBeforeSending; | 129 static const int kFractionBufferBeforeSending = 3; |
128 | 130 |
129 virtual ~ByteStreamWriter() = 0; | 131 virtual ~ByteStreamWriter() { }; |
130 | 132 |
131 // Always adds the data passed into the ByteStream. Returns true | 133 // Always adds the data passed into the ByteStream. Returns true |
132 // if more data may be added without exceeding the class limit | 134 // if more data may be added without exceeding the class limit |
133 // on data. Takes ownership of |buffer|. | 135 // on data. Takes ownership of |buffer|. |
134 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | 136 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
135 size_t byte_count) = 0; | 137 size_t byte_count) = 0; |
136 | 138 |
137 // Signal that all data that is going to be sent, has been sent, | 139 // Signal that all data that is going to be sent, has been sent, |
138 // and provide a status. |DOWNLOAD_INTERRUPT_REASON_NONE| should be | 140 // and provide a status. |
139 // passed for successful completion. | 141 virtual void Close(StatusType status) = 0; |
140 virtual void Close(DownloadInterruptReason status) = 0; | |
141 | 142 |
142 // Register a callback to be called when the stream transitions from | 143 // Register a callback to be called when the stream transitions from |
143 // full to having space available. The callback will always be | 144 // full to having space available. The callback will always be |
144 // called on the task runner associated with the ByteStreamWriter. | 145 // called on the task runner associated with the ByteStreamWriter. |
145 // This callback will only be called if a call to Write has previously | 146 // This callback will only be called if a call to Write has previously |
146 // returned false (i.e. the ByteStream has been filled). | 147 // returned false (i.e. the ByteStream has been filled). |
147 // Multiple calls to this function are supported, though note that it | 148 // Multiple calls to this function are supported, though note that it |
148 // is the callers responsibility to handle races with space becoming | 149 // is the callers responsibility to handle races with space becoming |
149 // available (i.e. in the case of that race either of the before | 150 // available (i.e. in the case of that race either of the before |
150 // or after callbacks may be called). | 151 // or after callbacks may be called). |
151 // The callback will not be called after ByteStreamWriter destruction. | 152 // The callback will not be called after ByteStreamWriter destruction. |
152 virtual void RegisterCallback(const base::Closure& source_callback) = 0; | 153 virtual void RegisterCallback(const base::Closure& source_callback) = 0; |
153 }; | 154 }; |
154 | 155 |
155 class CONTENT_EXPORT ByteStreamReader { | 156 template <typename StatusType> |
157 class ByteStreamReader { | |
156 public: | 158 public: |
157 // Inverse of the fraction of the stream buffer that must be empty before | 159 // Inverse of the fraction of the stream buffer that must be empty before |
158 // a notification is send to paired Writer that there's more room. | 160 // a notification is send to paired Writer that there's more room. |
159 static const int kFractionReadBeforeWindowUpdate; | 161 static const int kFractionReadBeforeWindowUpdate = 3; |
160 | 162 |
161 enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; | 163 enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE }; |
162 | 164 |
163 virtual ~ByteStreamReader() = 0; | 165 virtual ~ByteStreamReader() { }; |
164 | 166 |
165 // Returns STREAM_EMPTY if there is no data on the ByteStream and | 167 // Returns STREAM_EMPTY if there is no data on the ByteStream and |
166 // Close() has not been called, and STREAM_COMPLETE if there | 168 // Close() has not been called, and STREAM_COMPLETE if there |
167 // is no data on the ByteStream and Close() has been called. | 169 // is no data on the ByteStream and Close() has been called. |
168 // If there is data on the ByteStream, returns STREAM_HAS_DATA | 170 // If there is data on the ByteStream, returns STREAM_HAS_DATA |
169 // and fills in |*data| with a pointer to the data, and |*length| | 171 // and fills in |*data| with a pointer to the data, and |*length| |
170 // with its length. | 172 // with its length. |
171 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | 173 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, |
172 size_t* length) = 0; | 174 size_t* length) = 0; |
173 | 175 |
174 // Only valid to call if Read() has returned STREAM_COMPLETE. | 176 // Only valid to call if Read() has returned STREAM_COMPLETE. |
175 virtual DownloadInterruptReason GetStatus() const = 0; | 177 virtual StatusType GetStatus() const = 0; |
176 | 178 |
177 // Register a callback to be called when data is added or the source | 179 // Register a callback to be called when data is added or the source |
178 // completes. The callback will be always be called on the owning | 180 // completes. The callback will be always be called on the owning |
179 // task runner. Multiple calls to this function are supported, | 181 // task runner. Multiple calls to this function are supported, |
180 // though note that it is the callers responsibility to handle races | 182 // though note that it is the callers responsibility to handle races |
181 // with data becoming available (i.e. in the case of that race | 183 // with data becoming available (i.e. in the case of that race |
182 // either of the before or after callbacks may be called). | 184 // either of the before or after callbacks may be called). |
183 // The callback will not be called after ByteStreamReader destruction. | 185 // The callback will not be called after ByteStreamReader destruction. |
184 virtual void RegisterCallback(const base::Closure& sink_callback) = 0; | 186 virtual void RegisterCallback(const base::Closure& sink_callback) = 0; |
185 }; | 187 }; |
186 | 188 |
187 CONTENT_EXPORT void CreateByteStream( | 189 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > |
Randy Smith (Not in Mondays)
2013/07/30 17:43:59
I'm surprised that removing CONTENT_EXPORT works;
tyoshino (SeeGerritForStatus)
2013/07/30 18:55:59
As we have implementation in .h file (to instantia
| |
190 ByteStreamContentVector; | |
191 | |
192 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be | |
193 // cleared in an object destructor and accessed to check for object | |
194 // existence. We can't use weak pointers because they're tightly tied to | |
195 // threads rather than task runners. | |
196 // TODO(rdsmith): A better solution would be extending weak pointers | |
197 // to support SequencedTaskRunners. | |
198 struct ByteStreamLifetimeFlag | |
199 : public base::RefCountedThreadSafe<ByteStreamLifetimeFlag> { | |
200 public: | |
201 ByteStreamLifetimeFlag() : is_alive(true) { } | |
202 bool is_alive; | |
203 | |
204 protected: | |
205 friend class base::RefCountedThreadSafe<ByteStreamLifetimeFlag>; | |
206 virtual ~ByteStreamLifetimeFlag() { } | |
207 | |
208 private: | |
209 DISALLOW_COPY_AND_ASSIGN(ByteStreamLifetimeFlag); | |
210 }; | |
211 | |
212 template <typename StatusType> | |
213 class ByteStreamReaderImpl; | |
214 | |
215 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and | |
216 // SetPeer may happen anywhere; all other operations on each class must | |
217 // happen in the context of their SequencedTaskRunner. | |
218 template <typename StatusType> | |
219 class ByteStreamWriterImpl : public ByteStreamWriter<StatusType> { | |
220 public: | |
221 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
222 scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, | |
223 size_t buffer_size) | |
224 : total_buffer_size_(buffer_size), | |
225 my_task_runner_(task_runner), | |
226 my_lifetime_flag_(lifetime_flag), | |
227 input_contents_size_(0), | |
228 output_size_used_(0), | |
229 peer_(NULL) { | |
230 DCHECK(my_lifetime_flag_.get()); | |
231 my_lifetime_flag_->is_alive = true; | |
232 } | |
233 | |
234 virtual ~ByteStreamWriterImpl() { | |
235 my_lifetime_flag_->is_alive = false; | |
236 } | |
237 | |
238 // Must be called before any operations are performed. | |
239 void SetPeer(ByteStreamReaderImpl<StatusType>* peer, | |
240 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
241 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { | |
242 peer_ = peer; | |
243 peer_task_runner_ = peer_task_runner; | |
244 peer_lifetime_flag_ = peer_lifetime_flag; | |
245 } | |
246 | |
247 // Overridden from ByteStreamWriter. | |
248 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | |
249 size_t byte_count) OVERRIDE; | |
250 virtual void Close(StatusType status) OVERRIDE; | |
251 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE { | |
252 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
253 space_available_callback_ = source_callback; | |
254 } | |
255 | |
256 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | |
257 static void UpdateWindow(scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, | |
258 ByteStreamWriterImpl<StatusType>* target, | |
259 size_t bytes_consumed) { | |
260 // If the target object isn't alive anymore, we do nothing. | |
261 if (!lifetime_flag->is_alive) return; | |
262 | |
263 target->UpdateWindowInternal(bytes_consumed); | |
264 } | |
265 | |
266 private: | |
267 // Called from UpdateWindow when object existence has been validated. | |
268 void UpdateWindowInternal(size_t bytes_consumed) { | |
269 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
270 DCHECK_GE(output_size_used_, bytes_consumed); | |
271 output_size_used_ -= bytes_consumed; | |
272 | |
273 // Callback if we were above the limit and we're now <= to it. | |
274 size_t total_known_size_used = | |
275 input_contents_size_ + output_size_used_; | |
276 | |
277 if (total_known_size_used <= total_buffer_size_ && | |
278 (total_known_size_used + bytes_consumed > total_buffer_size_) && | |
279 !space_available_callback_.is_null()) | |
280 space_available_callback_.Run(); | |
281 } | |
282 | |
283 void DrainInputBuffer(scoped_ptr<ByteStreamContentVector>* buffer) { | |
284 if (0 == input_contents_size_) | |
285 return; | |
286 | |
287 buffer->reset(new ByteStreamContentVector); | |
288 (*buffer)->swap(input_contents_); | |
289 output_size_used_ += input_contents_size_; | |
290 input_contents_size_ = 0; | |
291 } | |
292 | |
293 const size_t total_buffer_size_; | |
294 | |
295 // All data objects in this class are only valid to access on | |
296 // this task runner except as otherwise noted. | |
297 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
298 | |
299 // True while this object is alive. | |
300 scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; | |
301 | |
302 base::Closure space_available_callback_; | |
303 ByteStreamContentVector input_contents_; | |
304 size_t input_contents_size_; | |
305 | |
306 // ** Peer information. | |
307 | |
308 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
309 | |
310 // How much we've sent to the output that for flow control purposes we | |
311 // must assume hasn't been read yet. | |
312 size_t output_size_used_; | |
313 | |
314 // Only valid to access on peer_task_runner_. | |
315 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; | |
316 | |
317 // Only valid to access on peer_task_runner_ if | |
318 // |*peer_lifetime_flag_ == true| | |
319 ByteStreamReaderImpl<StatusType>* peer_; | |
320 }; | |
321 | |
322 template <typename StatusType> | |
323 class ByteStreamReaderImpl : public ByteStreamReader<StatusType> { | |
324 public: | |
325 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
326 scoped_refptr<ByteStreamLifetimeFlag> lifetime_flag, | |
327 size_t buffer_size) | |
328 : total_buffer_size_(buffer_size), | |
329 my_task_runner_(task_runner), | |
330 my_lifetime_flag_(lifetime_flag), | |
331 received_status_(false), | |
332 unreported_consumed_bytes_(0), | |
333 peer_(NULL) { | |
334 DCHECK(my_lifetime_flag_.get()); | |
335 my_lifetime_flag_->is_alive = true; | |
336 } | |
337 | |
338 virtual ~ByteStreamReaderImpl() { | |
339 my_lifetime_flag_->is_alive = false; | |
340 } | |
341 | |
342 // Must be called before any operations are performed. | |
343 void SetPeer(ByteStreamWriterImpl<StatusType>* peer, | |
344 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
345 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag) { | |
346 peer_ = peer; | |
347 peer_task_runner_ = peer_task_runner; | |
348 peer_lifetime_flag_ = peer_lifetime_flag; | |
349 } | |
350 | |
351 // Overridden from ByteStreamReader. | |
352 virtual typename ByteStreamReader<StatusType>::StreamState Read( | |
353 scoped_refptr<net::IOBuffer>* data, | |
354 size_t* length) OVERRIDE { | |
355 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
356 | |
357 if (available_contents_.size()) { | |
358 *data = available_contents_.front().first; | |
359 *length = available_contents_.front().second; | |
360 available_contents_.pop_front(); | |
361 unreported_consumed_bytes_ += *length; | |
362 | |
363 MaybeUpdateInput(); | |
364 return ByteStreamReader<StatusType>::STREAM_HAS_DATA; | |
365 } | |
366 if (received_status_) { | |
367 return ByteStreamReader<StatusType>::STREAM_COMPLETE; | |
368 } | |
369 return ByteStreamReader<StatusType>::STREAM_EMPTY; | |
370 } | |
371 virtual StatusType GetStatus() const OVERRIDE { | |
372 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
373 DCHECK(received_status_); | |
374 return status_; | |
375 } | |
376 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE { | |
377 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
378 | |
379 data_available_callback_ = sink_callback; | |
380 } | |
381 | |
382 // PostTask targets from |ByteStreamWriterImpl::Write| and | |
383 // |ByteStreamWriterImpl::Close|. | |
384 // Receive data from our peer. | |
385 // static because it may be called after the object it is targeting | |
386 // has been destroyed. It may not access |*target| | |
387 // if |*object_lifetime_flag| is false. | |
388 static void TransferData( | |
389 scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, | |
390 ByteStreamReaderImpl<StatusType>* target, | |
391 scoped_ptr<ByteStreamContentVector> transfer_buffer) { | |
392 // If our target is no longer alive, do nothing. | |
393 if (!object_lifetime_flag->is_alive) return; | |
394 | |
395 target->TransferDataInternal(transfer_buffer.Pass()); | |
396 } | |
397 static void TransferDataAndClose( | |
398 scoped_refptr<ByteStreamLifetimeFlag> object_lifetime_flag, | |
399 ByteStreamReaderImpl<StatusType>* target, | |
400 scoped_ptr<ByteStreamContentVector> transfer_buffer, | |
401 StatusType status) { | |
402 // If our target is no longer alive, do nothing. | |
403 if (!object_lifetime_flag->is_alive) return; | |
404 | |
405 target->TransferDataAndCloseInternal(transfer_buffer.Pass(), status); | |
406 } | |
407 | |
408 private: | |
409 void AppendBuffer(scoped_ptr<ByteStreamContentVector> buffer) { | |
410 if (!buffer) | |
411 return; | |
412 | |
413 available_contents_.insert(available_contents_.end(), | |
414 buffer->begin(), | |
415 buffer->end()); | |
416 } | |
417 | |
418 void MaybeInvokeCallback(bool was_empty, bool source_complete) { | |
419 // Callback on transition from empty to non-empty, or | |
420 // source complete. | |
421 if (((was_empty && !available_contents_.empty()) || | |
422 source_complete) && | |
423 !data_available_callback_.is_null()) | |
424 data_available_callback_.Run(); | |
425 } | |
426 | |
427 // Called from TransferData.* once object existence has been validated. | |
428 void TransferDataInternal( | |
429 scoped_ptr<ByteStreamContentVector> transfer_buffer) { | |
430 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
431 | |
432 bool was_empty = available_contents_.empty(); | |
433 | |
434 AppendBuffer(transfer_buffer.Pass()); | |
435 | |
436 MaybeInvokeCallback(was_empty, false /* source_complete */); | |
437 } | |
438 void TransferDataAndCloseInternal( | |
439 scoped_ptr<ByteStreamContentVector> transfer_buffer, StatusType status) { | |
440 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
441 | |
442 bool was_empty = available_contents_.empty(); | |
443 | |
444 AppendBuffer(transfer_buffer.Pass()); | |
445 | |
446 received_status_ = true; | |
447 status_ = status; | |
448 | |
449 MaybeInvokeCallback(was_empty, true /* source_complete */); | |
450 } | |
451 | |
452 void MaybeUpdateInput(); | |
453 | |
454 const size_t total_buffer_size_; | |
455 | |
456 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
457 | |
458 // True while this object is alive. | |
459 scoped_refptr<ByteStreamLifetimeFlag> my_lifetime_flag_; | |
460 | |
461 ByteStreamContentVector available_contents_; | |
462 | |
463 bool received_status_; | |
464 StatusType status_; | |
465 | |
466 base::Closure data_available_callback_; | |
467 | |
468 // Time of last point at which data in stream transitioned from full | |
469 // to non-full. Nulled when a callback is sent. | |
470 base::Time last_non_full_time_; | |
471 | |
472 // ** Peer information | |
473 | |
474 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
475 | |
476 // How much has been removed from this class that we haven't told | |
477 // the input about yet. | |
478 size_t unreported_consumed_bytes_; | |
479 | |
480 // Only valid to access on peer_task_runner_. | |
481 scoped_refptr<ByteStreamLifetimeFlag> peer_lifetime_flag_; | |
482 | |
483 // Only valid to access on peer_task_runner_ if | |
484 // |*peer_lifetime_flag_ == true| | |
485 ByteStreamWriterImpl<StatusType>* peer_; | |
486 }; | |
487 | |
488 template <typename StatusType> | |
489 bool ByteStreamWriterImpl<StatusType>::Write( | |
490 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | |
491 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
492 | |
493 input_contents_.push_back(std::make_pair(buffer, byte_count)); | |
494 input_contents_size_ += byte_count; | |
495 | |
496 // Arbitrarily, we buffer to a third of the total size before sending. | |
497 if (input_contents_size_ > total_buffer_size_ / | |
498 ByteStreamWriter<StatusType>::kFractionBufferBeforeSending) { | |
499 scoped_ptr<ByteStreamContentVector> transfer_buffer( | |
500 new ByteStreamContentVector); | |
501 DrainInputBuffer(&transfer_buffer); | |
502 | |
503 peer_task_runner_->PostTask( | |
504 FROM_HERE, base::Bind( | |
505 &ByteStreamReaderImpl<StatusType>::TransferData, | |
506 peer_lifetime_flag_, | |
507 peer_, | |
508 base::Passed(&transfer_buffer))); | |
509 } | |
510 | |
511 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | |
512 } | |
513 | |
514 template <typename StatusType> | |
515 void ByteStreamWriterImpl<StatusType>::Close(StatusType status) { | |
516 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
517 | |
518 scoped_ptr<ByteStreamContentVector> transfer_buffer( | |
519 new ByteStreamContentVector); | |
520 DrainInputBuffer(&transfer_buffer); | |
521 | |
522 peer_task_runner_->PostTask( | |
523 FROM_HERE, base::Bind( | |
524 &ByteStreamReaderImpl<StatusType>::TransferDataAndClose, | |
525 peer_lifetime_flag_, | |
526 peer_, | |
527 base::Passed(&transfer_buffer), | |
528 status)); | |
529 } | |
530 | |
531 // Decide whether or not to send the input a window update. | |
532 // Currently we do that whenever we've got unreported consumption | |
533 // greater than 1/3 of total size. | |
534 template <typename StatusType> | |
535 void ByteStreamReaderImpl<StatusType>::MaybeUpdateInput() { | |
536 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
537 | |
538 if (unreported_consumed_bytes_ <= | |
539 total_buffer_size_ / | |
540 ByteStreamReader<StatusType>::kFractionReadBeforeWindowUpdate) | |
541 return; | |
542 | |
543 peer_task_runner_->PostTask( | |
544 FROM_HERE, base::Bind( | |
545 &ByteStreamWriterImpl<StatusType>::UpdateWindow, | |
546 peer_lifetime_flag_, | |
547 peer_, | |
548 unreported_consumed_bytes_)); | |
549 unreported_consumed_bytes_ = 0; | |
550 } | |
551 | |
552 template <typename StatusType> | |
553 void CreateByteStream( | |
188 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | 554 scoped_refptr<base::SequencedTaskRunner> input_task_runner, |
189 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | 555 scoped_refptr<base::SequencedTaskRunner> output_task_runner, |
190 size_t buffer_size, | 556 size_t buffer_size, |
191 scoped_ptr<ByteStreamWriter>* input, | 557 scoped_ptr<ByteStreamWriter<StatusType> >* input, |
192 scoped_ptr<ByteStreamReader>* output); | 558 scoped_ptr<ByteStreamReader<StatusType> >* output) { |
559 scoped_refptr<ByteStreamLifetimeFlag> input_flag( | |
560 new ByteStreamLifetimeFlag()); | |
561 scoped_refptr<ByteStreamLifetimeFlag> output_flag( | |
562 new ByteStreamLifetimeFlag()); | |
563 | |
564 ByteStreamWriterImpl<StatusType>* in = new ByteStreamWriterImpl<StatusType>( | |
565 input_task_runner, input_flag, buffer_size); | |
566 ByteStreamReaderImpl<StatusType>* out = new ByteStreamReaderImpl<StatusType>( | |
567 output_task_runner, output_flag, buffer_size); | |
568 | |
569 in->SetPeer(out, output_task_runner, output_flag); | |
570 out->SetPeer(in, input_task_runner, input_flag); | |
571 input->reset(in); | |
572 output->reset(out); | |
573 } | |
193 | 574 |
194 } // namespace content | 575 } // namespace content |
195 | 576 |
196 #endif // CONTENT_BROWSER_BYTE_STREAM_H_ | 577 #endif // CONTENT_BROWSER_BYTE_STREAM_H_ |
OLD | NEW |