OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "content/browser/download/byte_stream.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/location.h" | |
9 #include "base/memory/weak_ptr.h" | |
10 #include "base/memory/ref_counted.h" | |
11 #include "base/sequenced_task_runner.h" | |
12 | |
13 namespace { | |
14 | |
15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > | |
16 ContentVector; | |
17 | |
18 // The fraction of the buffer that must be ready to send on the input | |
19 // before we ship data to the output. | |
20 static const int kFractionBufferBeforeSending = 3; | |
21 | |
22 // The fraction of the buffer that must have been consumed on the output | |
23 // before we update the input window. | |
24 static const int kFractionReadBeforeWindowUpdate = 3; | |
25 | |
26 class ByteStreamOutputImpl; | |
27 | |
28 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be | |
29 // cleared in an object destructor and accessed to check for object | |
30 // existence. We can't use weak pointers because they're tightly tied to | |
31 // threads rather than task runners. | |
32 // TODO(rdsmith): A better solution would be extending weak pointers | |
33 // to support SequencedTaskRunners. | |
34 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { | |
35 public: | |
36 LifetimeFlag() : is_alive_(true) { } | |
37 bool is_alive_; | |
38 | |
39 protected: | |
40 friend class base::RefCountedThreadSafe<LifetimeFlag>; | |
41 virtual ~LifetimeFlag() { } | |
42 | |
43 private: | |
44 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag); | |
45 }; | |
46 | |
47 // For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and | |
48 // SetPeer may happen anywhere; all other operations on each class must | |
49 // happen in the context of their SequencedTaskRunner. | |
50 class ByteStreamInputImpl : public content::ByteStreamInput { | |
51 public: | |
52 ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
53 scoped_refptr<LifetimeFlag> lifetime_flag, | |
54 size_t buffer_size); | |
55 virtual ~ByteStreamInputImpl(); | |
56 | |
57 // Must be called before any operations are performed. | |
58 void SetPeer(ByteStreamOutputImpl* peer, | |
59 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
60 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
61 | |
62 // Overridden from ByteStreamInput. | |
63 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | |
64 size_t byte_count) OVERRIDE; | |
65 virtual void Close(content::DownloadInterruptReason status) OVERRIDE; | |
66 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | |
67 | |
68 // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|. | |
69 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | |
70 ByteStreamInputImpl* target, | |
71 size_t bytes_consumed); | |
72 | |
73 private: | |
74 // Called from UpdateWindow when object existence has been validated. | |
75 void UpdateWindowMethod(size_t bytes_consumed); | |
76 | |
77 void PostToPeer(bool complete, content::DownloadInterruptReason status); | |
78 | |
79 const size_t total_buffer_size_; | |
80 | |
81 // All data objects in this class are only valid to access on | |
82 // this task runner except as otherwise noted. | |
83 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
84 | |
85 // True while this object is alive. | |
86 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
87 | |
88 base::Closure space_available_callback_; | |
89 ContentVector input_contents_; | |
90 size_t input_contents_size_; | |
91 | |
92 // ** Peer information. | |
93 | |
94 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
95 | |
96 // How much we've sent to the output that for flow control purposes we | |
97 // must assume hasn't been read yet. | |
98 size_t output_size_used_; | |
99 | |
100 // Only valid to access on peer_task_runner_. | |
101 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
102 | |
103 // Only valid to access on peer_task_runner_ if | |
104 // |*peer_lifetime_flag_ == true| | |
105 ByteStreamOutputImpl* peer_; | |
106 }; | |
107 | |
108 class ByteStreamOutputImpl : public content::ByteStreamOutput { | |
109 public: | |
110 ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
111 scoped_refptr<LifetimeFlag> lifetime_flag, | |
112 size_t buffer_size); | |
113 virtual ~ByteStreamOutputImpl(); | |
114 | |
115 // Must be called before any operations are performed. | |
116 void SetPeer(ByteStreamInputImpl* peer, | |
117 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
118 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
119 | |
120 // Overridden from ByteStreamOutput. | |
121 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | |
122 size_t* length) OVERRIDE; | |
123 virtual content::DownloadInterruptReason GetStatus() const OVERRIDE; | |
124 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | |
125 | |
126 // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and | |
127 // |ByteStreamInputImpl::Close|. | |
128 // Receive data from our peer. | |
129 // static because it may be called after the object it is targeting | |
130 // has been destroyed. It may not access |*target| | |
131 // if |*object_lifetime_flag| is false. | |
132 static void TransferData( | |
133 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
134 ByteStreamOutputImpl* target, | |
135 scoped_ptr<ContentVector> xfer_buffer, | |
136 size_t xfer_buffer_bytes, | |
137 bool source_complete, | |
138 content::DownloadInterruptReason status); | |
139 | |
140 private: | |
141 // Called from TransferData once object existence has been validated. | |
142 void TransferDataMethod( | |
benjhayden
2012/05/16 21:11:04
Sounds like you're transferring something called a
Randy Smith (Not in Mondays)
2012/05/17 18:24:32
Done.
| |
143 scoped_ptr<ContentVector> xfer_buffer, | |
144 size_t xfer_buffer_bytes, | |
145 bool source_complete, | |
146 content::DownloadInterruptReason status); | |
147 | |
148 void MaybeUpdateInput(); | |
149 | |
150 const size_t total_buffer_size_; | |
151 | |
152 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
153 | |
154 // True while this object is alive. | |
155 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
156 | |
157 ContentVector available_contents_; | |
158 size_t available_contents_size_; | |
159 | |
160 bool received_status_; | |
161 content::DownloadInterruptReason status_; | |
162 | |
163 base::Closure data_available_callback_; | |
164 | |
165 // Time of last point at which data in stream transitioned from full | |
166 // to non-full. Nulled when a callback is sent. | |
167 base::Time last_non_full_time_; | |
168 | |
169 // ** Peer information | |
170 | |
171 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
172 | |
173 // How much has been removed from this class that we haven't told | |
174 // the input about yet. | |
175 size_t unreported_consumed_bytes_; | |
176 | |
177 // Only valid to access on peer_task_runner_. | |
178 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
179 | |
180 // Only valid to access on peer_task_runner_ if | |
181 // |*peer_lifetime_flag_ == true| | |
182 ByteStreamInputImpl* peer_; | |
183 }; | |
184 | |
185 ByteStreamInputImpl::ByteStreamInputImpl( | |
186 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
187 scoped_refptr<LifetimeFlag> lifetime_flag, | |
188 size_t buffer_size) | |
189 : total_buffer_size_(buffer_size), | |
190 my_task_runner_(task_runner), | |
191 my_lifetime_flag_(lifetime_flag), | |
192 input_contents_size_(0), | |
193 output_size_used_(0), | |
194 peer_(NULL) { | |
195 DCHECK(my_lifetime_flag_.get()); | |
196 my_lifetime_flag_->is_alive_ = true; | |
197 } | |
198 | |
199 ByteStreamInputImpl::~ByteStreamInputImpl() { | |
200 my_lifetime_flag_->is_alive_ = false; | |
201 } | |
202 | |
203 void ByteStreamInputImpl::SetPeer( | |
204 ByteStreamOutputImpl* peer, | |
205 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
206 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
207 peer_ = peer; | |
208 peer_task_runner_ = peer_task_runner; | |
209 peer_lifetime_flag_ = peer_lifetime_flag; | |
210 } | |
211 | |
212 bool ByteStreamInputImpl::Write( | |
213 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | |
214 input_contents_.push_back(std::make_pair(buffer, byte_count)); | |
215 input_contents_size_ += byte_count; | |
216 | |
217 // Arbitrarily, we buffer to a third of the total size before sending. | |
218 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | |
219 PostToPeer(false, content::DOWNLOAD_INTERRUPT_REASON_NONE); | |
220 | |
221 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | |
222 } | |
223 | |
224 void ByteStreamInputImpl::Close( | |
225 content::DownloadInterruptReason status) { | |
226 PostToPeer(true, status); | |
227 } | |
228 | |
229 void ByteStreamInputImpl::RegisterCallback( | |
230 const base::Closure& source_callback) { | |
231 space_available_callback_ = source_callback; | |
232 } | |
233 | |
234 // static | |
235 void ByteStreamInputImpl::UpdateWindow( | |
236 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target, | |
237 size_t bytes_consumed) { | |
238 // If the target object isn't alive anymore, we do nothing. | |
239 if (!lifetime_flag->is_alive_) return; | |
240 | |
241 target->UpdateWindowMethod(bytes_consumed); | |
242 } | |
243 | |
244 void ByteStreamInputImpl::UpdateWindowMethod(size_t bytes_consumed) { | |
245 DCHECK_GE(output_size_used_, bytes_consumed); | |
246 output_size_used_ -= bytes_consumed; | |
247 | |
248 // Callback if we were above the limit and we're now <= to it. | |
249 size_t total_known_size_used = | |
250 input_contents_size_ + output_size_used_; | |
251 | |
252 if (total_known_size_used <= total_buffer_size_ && | |
253 (total_known_size_used + bytes_consumed > total_buffer_size_) && | |
254 !space_available_callback_.is_null()) | |
255 space_available_callback_.Run(); | |
256 } | |
257 | |
258 void ByteStreamInputImpl::PostToPeer( | |
259 bool complete, content::DownloadInterruptReason status) { | |
260 // Valid contexts in which to call. | |
261 DCHECK(complete || 0 != input_contents_size_); | |
262 | |
263 scoped_ptr<ContentVector> xfer_buffer(new ContentVector); | |
264 size_t buffer_size = 0; | |
265 if (0 != input_contents_size_) { | |
266 xfer_buffer.reset(new ContentVector); | |
267 xfer_buffer->swap(input_contents_); | |
268 buffer_size = input_contents_size_; | |
269 output_size_used_ += input_contents_size_; | |
270 input_contents_size_ = 0; | |
271 } | |
272 peer_task_runner_->PostTask( | |
273 FROM_HERE, base::Bind( | |
274 &ByteStreamOutputImpl::TransferData, | |
275 peer_lifetime_flag_, | |
276 peer_, | |
277 base::Passed(xfer_buffer.Pass()), | |
278 buffer_size, | |
279 complete, | |
280 status)); | |
281 } | |
282 | |
283 ByteStreamOutputImpl::ByteStreamOutputImpl( | |
284 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
285 scoped_refptr<LifetimeFlag> lifetime_flag, | |
286 size_t buffer_size) | |
287 : total_buffer_size_(buffer_size), | |
288 my_task_runner_(task_runner), | |
289 my_lifetime_flag_(lifetime_flag), | |
290 available_contents_size_(0), | |
291 received_status_(false), | |
292 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE), | |
293 unreported_consumed_bytes_(0), | |
294 peer_(NULL) { | |
295 DCHECK(my_lifetime_flag_.get()); | |
296 my_lifetime_flag_->is_alive_ = true; | |
297 } | |
298 | |
299 ByteStreamOutputImpl::~ByteStreamOutputImpl() { | |
300 my_lifetime_flag_->is_alive_ = false; | |
301 } | |
302 | |
303 void ByteStreamOutputImpl::SetPeer( | |
304 ByteStreamInputImpl* peer, | |
305 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
306 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
307 peer_ = peer; | |
308 peer_task_runner_ = peer_task_runner; | |
309 peer_lifetime_flag_ = peer_lifetime_flag; | |
310 } | |
311 | |
312 ByteStreamOutputImpl::StreamState | |
313 ByteStreamOutputImpl::Read(scoped_refptr<net::IOBuffer>* data, | |
314 size_t* length) { | |
315 if (available_contents_.size()) { | |
316 *data = available_contents_.front().first; | |
317 *length = available_contents_.front().second; | |
318 available_contents_.pop_front(); | |
319 DCHECK_GE(available_contents_size_, *length); | |
320 available_contents_size_ -= *length; | |
321 unreported_consumed_bytes_ += *length; | |
322 | |
323 MaybeUpdateInput(); | |
324 return STREAM_HAS_DATA; | |
325 } | |
326 if (received_status_) { | |
327 return STREAM_COMPLETE; | |
328 } | |
329 return STREAM_EMPTY; | |
330 } | |
331 | |
332 content::DownloadInterruptReason | |
333 ByteStreamOutputImpl::GetStatus() const { | |
334 DCHECK(received_status_); | |
335 return status_; | |
336 } | |
337 | |
338 void ByteStreamOutputImpl::RegisterCallback( | |
339 const base::Closure& sink_callback) { | |
340 data_available_callback_ = sink_callback; | |
341 } | |
342 | |
343 // static | |
344 void ByteStreamOutputImpl::TransferData( | |
345 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
346 ByteStreamOutputImpl* target, | |
347 scoped_ptr<ContentVector> xfer_buffer, | |
348 size_t buffer_size, | |
349 bool source_complete, | |
350 content::DownloadInterruptReason status) { | |
351 // If our target is no longer alive, do nothing. | |
352 if (!object_lifetime_flag->is_alive_) return; | |
353 | |
354 target->TransferDataMethod( | |
355 xfer_buffer.Pass(), buffer_size, source_complete, status); | |
356 } | |
357 | |
358 void ByteStreamOutputImpl::TransferDataMethod( | |
359 scoped_ptr<ContentVector> xfer_buffer, | |
benjhayden
2012/05/16 21:11:04
Want to spell out "transfer_buffer"?
Randy Smith (Not in Mondays)
2012/05/17 18:24:32
Done.
| |
360 size_t buffer_size, | |
361 bool source_complete, | |
362 content::DownloadInterruptReason status) { | |
363 if (xfer_buffer.get()) { | |
364 available_contents_.insert(available_contents_.end(), | |
365 xfer_buffer->begin(), | |
366 xfer_buffer->end()); | |
367 available_contents_size_ += buffer_size; | |
368 } | |
369 | |
370 if (source_complete) { | |
371 received_status_ = true; | |
372 status_ = status; | |
373 } | |
374 | |
375 // Callback if we didn't use to have data and we do now. | |
376 if (available_contents_size_ == buffer_size && | |
benjhayden
2012/05/16 21:11:04
bool was_empty = available_contents_.empty();
...
Randy Smith (Not in Mondays)
2012/05/17 18:24:32
Done.
| |
377 available_contents_size_ > 0 && | |
378 !data_available_callback_.is_null()) | |
379 data_available_callback_.Run(); | |
380 } | |
381 | |
382 // Decide whether or not to send the input a window update. | |
383 // Currently we do that whenever we've got unreported consumption | |
384 // greater than 1/3 of total size. | |
385 void ByteStreamOutputImpl::MaybeUpdateInput() { | |
386 if (unreported_consumed_bytes_ <= | |
387 total_buffer_size_ / kFractionReadBeforeWindowUpdate) | |
388 return; | |
389 | |
390 peer_task_runner_->PostTask( | |
391 FROM_HERE, base::Bind( | |
392 &ByteStreamInputImpl::UpdateWindow, | |
393 peer_lifetime_flag_, | |
394 peer_, | |
395 unreported_consumed_bytes_)); | |
396 unreported_consumed_bytes_ = 0; | |
397 } | |
398 | |
399 } // namespace | |
400 | |
401 namespace content { | |
402 | |
403 ByteStreamOutput::~ByteStreamOutput() { } | |
404 | |
405 ByteStreamInput::~ByteStreamInput() { } | |
406 | |
407 void CreateByteStream( | |
408 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | |
409 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | |
410 size_t buffer_size, | |
411 scoped_ptr<ByteStreamInput>* input, | |
412 scoped_ptr<ByteStreamOutput>* output) { | |
413 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); | |
414 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); | |
415 | |
416 ByteStreamInputImpl* in = new ByteStreamInputImpl( | |
417 input_task_runner, input_flag, buffer_size); | |
418 ByteStreamOutputImpl* out = new ByteStreamOutputImpl( | |
419 output_task_runner, output_flag, buffer_size); | |
420 | |
421 in->SetPeer(out, output_task_runner, output_flag); | |
422 out->SetPeer(in, input_task_runner, input_flag); | |
423 input->reset(in); | |
424 output->reset(out); | |
425 } | |
426 | |
427 } // namespace content | |
OLD | NEW |