OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "content/browser/download/byte_stream.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/location.h" | |
9 #include "base/memory/weak_ptr.h" | |
10 #include "base/memory/ref_counted.h" | |
11 #include "base/sequenced_task_runner.h" | |
12 | |
13 namespace { | |
14 | |
15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>,size_t> > | |
benjhayden
2012/05/16 14:26:25
Space after comma.
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
16 ContentVector; | |
17 | |
18 class ByteStreamOutputImpl; | |
19 | |
20 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be | |
21 // cleared in an object destructor and accessed to check for object | |
22 // existence. We can't use weak pointers because they're tightly tied to | |
23 // threads rather than task runners. | |
benjhayden
2012/05/16 14:26:25
TODO: TaskRunnerWeakPointer?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
24 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { | |
25 public: | |
26 LifetimeFlag() : is_alive_(true) { } | |
27 bool is_alive_; | |
benjhayden
2012/05/16 14:26:25
blank line between sections?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
28 protected: | |
29 friend class base::RefCountedThreadSafe<LifetimeFlag>; | |
30 virtual ~LifetimeFlag() { } | |
31 }; | |
benjhayden
2012/05/16 14:26:25
DISALLOW_COPY_AND_ASSIGN?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
32 | |
33 // For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and | |
34 // SetPeer may happen anywhere; all other operations on each class must | |
35 // happen in the context of their SequencedTaskRunner. | |
36 class ByteStreamInputImpl : public content::ByteStreamInput { | |
37 public: | |
38 ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
39 scoped_refptr<LifetimeFlag> lifetime_flag, | |
40 size_t buffer_size); | |
41 virtual ~ByteStreamInputImpl(); | |
42 | |
43 // Must be called before any operations are performed. | |
44 void SetPeer(ByteStreamOutputImpl* peer, | |
45 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
46 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
47 | |
48 // Overridden from ByteStreamInput. | |
49 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | |
50 size_t byte_count) OVERRIDE; | |
51 virtual void Close(content::DownloadInterruptReason status) OVERRIDE; | |
52 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | |
53 | |
54 // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|. | |
55 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | |
56 ByteStreamInputImpl* target, | |
57 size_t bytes_consumed); | |
58 | |
59 private: | |
60 void MaybePostToPeer(); | |
61 | |
62 const size_t total_buffer_size_; | |
63 | |
64 // All data objects in this class are only valid to access on | |
65 // this task runner except as otherwise noted. | |
66 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
67 | |
68 // True while this object is alive. | |
69 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
70 | |
71 base::Closure space_available_callback_; | |
72 ContentVector input_contents_; | |
73 size_t input_contents_size_; | |
74 | |
75 // ** Peer information. | |
76 | |
77 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
78 | |
79 // How much we've sent to the output that for flow control purposes we | |
80 // must assume hasn't been read yet. | |
81 size_t output_size_used_; | |
82 | |
83 // Only valid to access on peer_task_runner_. | |
84 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
85 | |
86 // Only valid to access on peer_task_runner_ if | |
87 // |*peer_lifetime_flag_ == true| | |
88 ByteStreamOutputImpl* peer_; | |
89 }; | |
90 | |
91 class ByteStreamOutputImpl : public content::ByteStreamOutput { | |
92 public: | |
93 ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
94 scoped_refptr<LifetimeFlag> lifetime_flag, | |
95 size_t buffer_size); | |
96 virtual ~ByteStreamOutputImpl(); | |
97 | |
98 // Must be called before any operations are performed. | |
99 void SetPeer(ByteStreamInputImpl* peer, | |
100 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
101 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
102 | |
103 // Overridden from ByteStreamOutput. | |
104 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | |
105 size_t* length) OVERRIDE; | |
106 virtual content::DownloadInterruptReason GetStatus() const OVERRIDE; | |
107 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | |
108 | |
109 // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and | |
110 // |ByteStreamInputImpl::Close|. | |
111 // Receive data from our peer. | |
112 // static because it may be called after the object it is targeting | |
113 // has been destroyed. It may not access |*target| | |
114 // if |*object_lifetime_flag| is false. | |
115 static void TransferData( | |
116 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
117 ByteStreamOutputImpl* target, | |
118 scoped_ptr<ContentVector> xfer_buffer, | |
119 size_t xfer_buffer_bytes, | |
120 bool source_complete, | |
121 content::DownloadInterruptReason status); | |
122 | |
123 private: | |
124 void MaybeUpdateInput(); | |
125 | |
126 const size_t total_buffer_size_; | |
127 | |
128 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
129 | |
130 // True while this object is alive. | |
131 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
132 | |
133 ContentVector available_contents_; | |
134 size_t available_contents_size_; | |
135 | |
136 bool received_status_; | |
137 content::DownloadInterruptReason status_; | |
138 | |
139 base::Closure data_available_callback_; | |
140 | |
141 // Time of last point at which data in stream transitioned from full | |
142 // to non-full. Nulled when a callback is sent. | |
143 base::Time last_non_full_time_; | |
144 | |
145 // ** Peer information | |
146 | |
147 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
148 | |
149 // How much has been removed from this class that we haven't told | |
150 // the input about yet. | |
151 size_t unreported_consumed_bytes_; | |
152 | |
153 // Only valid to access on peer_task_runner_. | |
154 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
155 | |
156 // Only valid to access on peer_task_runner_ if | |
157 // |*peer_lifetime_flag_ == true| | |
158 ByteStreamInputImpl* peer_; | |
159 }; | |
160 | |
161 ByteStreamInputImpl::ByteStreamInputImpl( | |
162 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
163 scoped_refptr<LifetimeFlag> lifetime_flag, | |
164 size_t buffer_size) | |
165 : total_buffer_size_(buffer_size), | |
166 my_task_runner_(task_runner), | |
167 my_lifetime_flag_(lifetime_flag), | |
168 input_contents_size_(0), | |
169 output_size_used_(0), | |
170 peer_(NULL) { | |
171 DCHECK(my_lifetime_flag_.get()); | |
172 my_lifetime_flag_->is_alive_ = true; | |
173 } | |
174 | |
175 ByteStreamInputImpl::~ByteStreamInputImpl() { | |
176 my_lifetime_flag_->is_alive_ = false; | |
177 } | |
178 | |
179 void ByteStreamInputImpl::SetPeer( | |
180 ByteStreamOutputImpl* peer, | |
181 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
182 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
183 peer_ = peer; | |
184 peer_task_runner_ = peer_task_runner; | |
185 peer_lifetime_flag_ = peer_lifetime_flag; | |
186 } | |
187 | |
188 bool ByteStreamInputImpl::Write( | |
189 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | |
190 input_contents_.push_back(std::make_pair(buffer, byte_count)); | |
191 input_contents_size_ += byte_count; | |
192 | |
193 MaybePostToPeer(); | |
194 | |
195 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | |
196 } | |
197 | |
198 void ByteStreamInputImpl::Close( | |
benjhayden
2012/05/16 19:17:21
This seems to duplicate some of MaybePostToPeer().
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
199 content::DownloadInterruptReason status) { | |
200 // Make sure to flush everything we have with the source notification. | |
201 scoped_ptr<ContentVector> xfer_buffer; | |
202 size_t buffer_size = 0; | |
203 if (0 != input_contents_size_) { | |
204 xfer_buffer.reset(new ContentVector); | |
205 xfer_buffer->swap(input_contents_); | |
206 buffer_size = input_contents_size_; | |
207 output_size_used_ += input_contents_size_; | |
208 input_contents_size_ = 0; | |
209 } | |
210 peer_task_runner_->PostTask( | |
211 FROM_HERE, base::Bind( | |
212 &ByteStreamOutputImpl::TransferData, | |
213 peer_lifetime_flag_, | |
214 peer_, | |
215 base::Passed(xfer_buffer.Pass()), | |
216 buffer_size, | |
217 true /* Source complete. */, | |
218 status)); | |
219 } | |
220 | |
221 void ByteStreamInputImpl::RegisterCallback( | |
222 const base::Closure& source_callback) { | |
223 space_available_callback_ = source_callback; | |
224 } | |
225 | |
226 // static | |
227 void ByteStreamInputImpl::UpdateWindow( | |
228 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target, | |
229 size_t bytes_consumed) { | |
230 | |
benjhayden
2012/05/16 15:41:13
No blank lines at beginning/ends of blocks.
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
231 // If the target object isn't alive anymore, we do nothing. | |
232 if (!lifetime_flag->is_alive_) return; | |
233 | |
234 DCHECK_GE(target->output_size_used_, bytes_consumed); | |
benjhayden
2012/05/16 15:41:13
Want to put the rest of this function in a method
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
235 target->output_size_used_ -= bytes_consumed; | |
236 | |
237 // Callback if we were above the limit and we're now <= to it. | |
238 size_t total_known_size_used = | |
239 target->input_contents_size_ + target->output_size_used_; | |
240 | |
241 if (total_known_size_used <= target->total_buffer_size_ && | |
242 (total_known_size_used + bytes_consumed > target->total_buffer_size_) && | |
243 !target->space_available_callback_.is_null()) | |
244 target->space_available_callback_.Run(); | |
245 } | |
246 | |
247 // Decide whether or not we've bufferred enough for a transfer. | |
248 // For right now "enough" will be "anything". | |
249 void ByteStreamInputImpl::MaybePostToPeer() { | |
250 // Arbitrarily, we buffer to a third of the total size before sending. | |
benjhayden
2012/05/16 15:41:13
Does this contradict L248?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Yep; oops. Fixed.
| |
251 if (input_contents_size_ > total_buffer_size_ / 3) { | |
benjhayden
2012/05/16 15:41:13
Invert and early-return?
benjhayden
2012/05/16 15:41:13
Use a kConstant instead of 3?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Obsolete because of refactor done for another comm
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done. I'm a bit hesitant because if I change this
| |
252 scoped_ptr<ContentVector> xfer_buffer(new ContentVector); | |
253 xfer_buffer->swap(input_contents_); | |
254 size_t buffer_size = input_contents_size_; | |
255 output_size_used_ += input_contents_size_; | |
256 input_contents_size_ = 0; | |
257 | |
258 peer_task_runner_->PostTask( | |
259 FROM_HERE, base::Bind( | |
260 &ByteStreamOutputImpl::TransferData, | |
261 peer_lifetime_flag_, | |
262 peer_, | |
263 base::Passed(xfer_buffer.Pass()), | |
264 buffer_size, | |
265 false /* Source not complete. */, | |
266 content::DOWNLOAD_INTERRUPT_REASON_NONE)); | |
267 } | |
268 } | |
269 | |
270 ByteStreamOutputImpl::ByteStreamOutputImpl( | |
271 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
272 scoped_refptr<LifetimeFlag> lifetime_flag, | |
273 size_t buffer_size) | |
274 : total_buffer_size_(buffer_size), | |
275 my_task_runner_(task_runner), | |
276 my_lifetime_flag_(lifetime_flag), | |
277 available_contents_size_(0), | |
278 received_status_(false), | |
279 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE), | |
280 unreported_consumed_bytes_(0), | |
281 peer_(NULL) { | |
282 DCHECK(my_lifetime_flag_.get()); | |
283 my_lifetime_flag_->is_alive_ = true; | |
284 } | |
285 | |
286 ByteStreamOutputImpl::~ByteStreamOutputImpl() { | |
287 my_lifetime_flag_->is_alive_ = false; | |
288 } | |
289 | |
290 void ByteStreamOutputImpl::SetPeer( | |
291 ByteStreamInputImpl* peer, | |
292 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
293 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
294 peer_ = peer; | |
295 peer_task_runner_ = peer_task_runner; | |
296 peer_lifetime_flag_ = peer_lifetime_flag; | |
297 } | |
298 | |
299 ByteStreamOutputImpl::StreamState | |
300 ByteStreamOutputImpl::Read(scoped_refptr<net::IOBuffer>* data, | |
301 size_t* length) { | |
302 if (available_contents_.size()) { | |
303 *data = available_contents_.front().first; | |
304 *length = available_contents_.front().second; | |
305 available_contents_.pop_front(); | |
306 DCHECK_GE(available_contents_size_, *length); | |
307 available_contents_size_ -= *length; | |
308 unreported_consumed_bytes_ += *length; | |
309 | |
310 MaybeUpdateInput(); | |
311 return STREAM_HAS_DATA; | |
312 } | |
313 if (received_status_) { | |
314 return STREAM_COMPLETE; | |
315 } | |
316 return STREAM_EMPTY; | |
317 } | |
318 | |
319 content::DownloadInterruptReason | |
320 ByteStreamOutputImpl::GetStatus() const { | |
321 DCHECK(received_status_); | |
322 return status_; | |
323 } | |
324 | |
325 void ByteStreamOutputImpl::RegisterCallback( | |
326 const base::Closure& sink_callback) { | |
327 data_available_callback_ = sink_callback; | |
328 } | |
329 | |
330 // static | |
331 void ByteStreamOutputImpl::TransferData( | |
332 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
333 ByteStreamOutputImpl* target, | |
334 scoped_ptr<ContentVector> xfer_buffer, | |
335 size_t buffer_size, | |
336 bool source_complete, | |
337 content::DownloadInterruptReason status) { | |
338 // If our target is no longer alive, do nothing. | |
339 if (!object_lifetime_flag->is_alive_) return; | |
340 | |
341 if (xfer_buffer.get()) { | |
342 target->available_contents_.insert(target->available_contents_.end(), | |
343 xfer_buffer->begin(), | |
344 xfer_buffer->end()); | |
345 target->available_contents_size_ += buffer_size; | |
346 } | |
347 | |
348 if (source_complete) { | |
349 target->received_status_ = true; | |
350 target->status_ = status; | |
351 } | |
352 | |
353 // Callback if we didn't use to have data and we do now. | |
354 if (target->available_contents_size_ == buffer_size && | |
benjhayden
2012/05/16 15:41:13
Looks like you can get rid of available_contents_s
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
How do you figure? I agree available_contents_siz
| |
355 target->available_contents_size_ > 0 && | |
356 !target->data_available_callback_.is_null()) | |
357 target->data_available_callback_.Run(); | |
358 } | |
359 | |
360 // Decide whether or not to send the input a window update. | |
361 // Currently we do that whenever we've got unreported consumption | |
362 // greater than 1/3 of total size. | |
363 void ByteStreamOutputImpl::MaybeUpdateInput() { | |
364 if (unreported_consumed_bytes_ > total_buffer_size_ / 3) { | |
benjhayden
2012/05/16 19:17:21
Invert and early-return?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
365 peer_task_runner_->PostTask( | |
366 FROM_HERE, base::Bind( | |
367 &ByteStreamInputImpl::UpdateWindow, | |
368 peer_lifetime_flag_, | |
369 peer_, | |
370 unreported_consumed_bytes_)); | |
371 unreported_consumed_bytes_ = 0; | |
372 } | |
373 } | |
374 | |
375 } // namespace | |
376 | |
377 namespace content { | |
378 | |
379 ByteStreamOutput::~ByteStreamOutput() { } | |
380 | |
381 ByteStreamInput::~ByteStreamInput() { } | |
382 | |
383 void CreateByteStream( | |
384 scoped_ptr<ByteStreamInput>* input, | |
385 scoped_ptr<ByteStreamOutput>* output, | |
386 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | |
387 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | |
388 size_t buffer_size) { | |
389 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); | |
390 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); | |
391 | |
392 ByteStreamInputImpl* in = new ByteStreamInputImpl( | |
393 input_task_runner, input_flag, buffer_size); | |
394 ByteStreamOutputImpl* out = new ByteStreamOutputImpl( | |
395 output_task_runner, output_flag, buffer_size); | |
396 | |
397 in->SetPeer(out, output_task_runner, output_flag); | |
398 out->SetPeer(in, input_task_runner, input_flag); | |
399 input->reset(in); | |
400 output->reset(out); | |
401 return; | |
benjhayden
2012/05/16 15:41:13
Feeling philosophical?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
You know, it took me a good fifteen seconds to fig
| |
402 } | |
403 | |
404 } // namespace content | |
OLD | NEW |