Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(459)

Side by Side Diff: content/browser/download/byte_stream.cc

Issue 10244001: Creation of ByteStream class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Finished and polished rewrite. Created 8 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/download/byte_stream.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/memory/weak_ptr.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/sequenced_task_runner.h"
12
13 namespace {
14
15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>,size_t> >
16 ContentVector;
17
18 class ByteStreamOutputImpl;
19
20 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
21 // cleared in an object destructor and accessed to check for object
22 // existence. We can't use weak pointers because they're tightly tied to
23 // threads rather than task runners.
24 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
25 public:
26 LifetimeFlag() : is_alive_(true) { }
27 bool is_alive_;
28 protected:
29 friend class base::RefCountedThreadSafe<LifetimeFlag>;
30 virtual ~LifetimeFlag() { }
31 };
32
33 // For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and
34 // SetPeer may happen anywhere; all other operations on each class must
35 // happen in the context of their SequencedTaskRunner.
36 class ByteStreamInputImpl : public content::ByteStreamInput {
37 public:
38 ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
39 scoped_refptr<LifetimeFlag> lifetime_flag,
40 size_t buffer_size);
41 virtual ~ByteStreamInputImpl();
42
43 // Must be called before any operations are performed.
44 void SetPeer(ByteStreamOutputImpl* peer,
45 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
46 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
47
48 // Overridden from ByteStreamInput.
49 virtual bool AddData(scoped_refptr<net::IOBuffer> buffer,
50 size_t byte_count) OVERRIDE;
51 virtual void SourceComplete(content::DownloadInterruptReason status) OVERRIDE;
52 virtual void RegisterCallback(base::Closure source_callback) OVERRIDE;
53
54 // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|.
55 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
56 ByteStreamInputImpl* target,
57 size_t bytes_consumed);
58
59 private:
60 void MaybePostToPeer();
61
62 const size_t total_buffer_size_;
63
64 // All data objects in this class are only valid to access on
65 // this task runner except as otherwise noted.
66 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
67
68 // True while this object is alive.
69 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
70
71 base::Closure space_available_callback_;
72 ContentVector input_contents_;
73 size_t input_contents_size_;
74
75 // Time of last point at which data in stream transitioned from zero
76 // to non-zero. Nulled when a callback is sent.
77 base::Time last_non_empty_time_;
78
79 // ** Peer information.
80
81 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
82
83 // How much we've sent to the output that for flow control purposes we
84 // must assume hasn't been read yet.
85 size_t output_size_used_;
86
87 // Only valid to access on peer_task_runner_.
88 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
89
90 // Only valid to access on peer_task_runner_ if
91 // |*peer_lifetime_flag_ == true|
92 ByteStreamOutputImpl* peer_;
93 };
94
95 class ByteStreamOutputImpl : public content::ByteStreamOutput {
96 public:
97 ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
98 scoped_refptr<LifetimeFlag> lifetime_flag,
99 size_t buffer_size);
100 virtual ~ByteStreamOutputImpl();
101
102 // Must be called before any operations are performed.
103 void SetPeer(ByteStreamInputImpl* peer,
104 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
105 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
106
107 // Overridden from ByteStreamOutput.
108 virtual StreamState GetData(scoped_refptr<net::IOBuffer>* data,
109 size_t* length) OVERRIDE;
110 virtual content::DownloadInterruptReason GetSourceResult() const OVERRIDE;
111 virtual void RegisterCallback(base::Closure sink_callback) OVERRIDE;
112 virtual size_t NumSourceCallbacks() const OVERRIDE;
113 virtual size_t NumSinkCallbacks() const OVERRIDE;
114 virtual size_t BytesRead() const OVERRIDE;
115 virtual size_t BuffersRead() const OVERRIDE;
116 virtual base::TimeDelta TotalSourceTriggerWaitTime() const OVERRIDE;
117 virtual base::TimeDelta TotalSinkTriggerWaitTime() const OVERRIDE;
118
119 // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and
120 // |ByteStreamInputImpl::SourceComplete|.
121 // Receive data from our peer.
122 // static because it may be called after the object it is targeting
123 // has been destroyed. It may not access |*target|
124 // if |*object_lifetime_flag| is false.
125 static void TransferData(
126 scoped_refptr<LifetimeFlag> object_lifetime_flag,
127 ByteStreamOutputImpl* target,
128 scoped_ptr<ContentVector> xfer_buffer,
129 size_t xfer_buffer_bytes,
130 bool source_complete,
131 content::DownloadInterruptReason status,
132 base::TimeDelta additional_sink_trigger_wait_time);
133
134 private:
135 void MaybeUpdateInput();
136
137 const size_t total_buffer_size_;
138
139 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
140
141 // True while this object is alive.
142 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
143
144 ContentVector available_contents_;
145 size_t available_contents_size_;
146
147 bool received_status_;
148 content::DownloadInterruptReason status_;
149
150 base::Closure data_available_callback_;
151
152 // Time of last point at which data in stream transitioned from full
153 // to non-full. Nulled when a callback is sent.
154 base::Time last_non_full_time_;
155
156 // ** Peer information
157
158 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
159
160 // How much has been removed from this class that we haven't told
161 // the input about yet.
162 size_t unreported_consumed_bytes_;
163
164 // Only valid to access on peer_task_runner_.
165 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
166
167 // Only valid to access on peer_task_runner_ if
168 // |*peer_lifetime_flag_ == true|
169 ByteStreamInputImpl* peer_;
170
171 // ** Stream statistics.
172 size_t bytes_read_;
173 size_t buffers_read_;
174 size_t sink_callbacks_triggered_;
175 size_t source_callbacks_triggered_;
176 base::TimeDelta total_sink_trigger_wait_time_;
177 base::TimeDelta total_source_trigger_wait_time_;
178 };
179
180 ByteStreamInputImpl::ByteStreamInputImpl(
181 scoped_refptr<base::SequencedTaskRunner> task_runner,
182 scoped_refptr<LifetimeFlag> lifetime_flag,
183 size_t buffer_size)
184 : total_buffer_size_(buffer_size),
185 my_task_runner_(task_runner),
186 my_lifetime_flag_(lifetime_flag),
187 input_contents_size_(0),
188 output_size_used_(0),
189 peer_(NULL) {
190 DCHECK(my_lifetime_flag_.get());
191 my_lifetime_flag_->is_alive_ = true;
192 }
193
194 ByteStreamInputImpl::~ByteStreamInputImpl() {
195 my_lifetime_flag_->is_alive_ = false;
196 }
197
198 void ByteStreamInputImpl::SetPeer(
199 ByteStreamOutputImpl* peer,
200 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
201 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
202 peer_ = peer;
203 peer_task_runner_ = peer_task_runner;
204 peer_lifetime_flag_ = peer_lifetime_flag;
205 }
206
207 bool ByteStreamInputImpl::AddData(
208 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
209 if (input_contents_size_ == 0 && byte_count != 0)
210 last_non_empty_time_ = base::Time::Now();
211
212 input_contents_.push_back(std::make_pair(buffer, byte_count));
213 input_contents_size_ += byte_count;
214
215 MaybePostToPeer();
216
217 return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
218 }
219
220 void ByteStreamInputImpl::SourceComplete(
221 content::DownloadInterruptReason status) {
222 // Make sure to flush everything we have with the source notification.
223 scoped_ptr<ContentVector> xfer_buffer;
224 size_t buffer_size = 0;
225 base::TimeDelta additional_source_wait_time; // Constructs to 0.
226 if (0 != input_contents_size_) {
227 xfer_buffer.reset(new ContentVector);
228 xfer_buffer->swap(input_contents_);
229 buffer_size = input_contents_size_;
230 output_size_used_ += input_contents_size_;
231 input_contents_size_ = 0;
232 if (!last_non_empty_time_.is_null()) {
233 additional_source_wait_time = base::Time::Now() - last_non_empty_time_;
234 last_non_empty_time_ = base::Time();
235 }
236 }
237 peer_task_runner_->PostTask(
238 FROM_HERE, base::Bind(
239 &ByteStreamOutputImpl::TransferData,
240 peer_lifetime_flag_,
241 peer_,
242 base::Passed(xfer_buffer.Pass()),
243 buffer_size,
244 true /* Source complete. */,
245 status,
246 additional_source_wait_time));
247 }
248
249 void ByteStreamInputImpl::RegisterCallback(
250 base::Closure source_callback) {
251 space_available_callback_ = source_callback;
252 }
253
254 // static
255 void ByteStreamInputImpl::UpdateWindow(
256 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target,
257 size_t bytes_consumed) {
258
259 // If the target object isn't alive anymore, we do nothing.
260 if (!lifetime_flag->is_alive_) return;
261
262 DCHECK_GE(target->output_size_used_, bytes_consumed);
263 target->output_size_used_ -= bytes_consumed;
264
265 if (!target->space_available_callback_.is_null())
266 target->space_available_callback_.Run();
267 }
268
269 // Decide whether or not we've bufferred enough for a transfer.
270 // For right now "enough" will be "anything".
271 void ByteStreamInputImpl::MaybePostToPeer() {
272 // Arbitrarily, we buffer to a third of the total size before sending.
273 if (input_contents_size_ > total_buffer_size_ / 3) {
274 scoped_ptr<ContentVector> xfer_buffer(new ContentVector);
275 xfer_buffer->swap(input_contents_);
276 size_t buffer_size = input_contents_size_;
277 output_size_used_ += input_contents_size_;
278 input_contents_size_ = 0;
279
280 base::TimeDelta additional_source_wait_time; // Constructs to 0.
281 if (!last_non_empty_time_.is_null()) {
282 additional_source_wait_time = base::Time::Now() - last_non_empty_time_;
283 last_non_empty_time_ = base::Time();
284 }
285
286 peer_task_runner_->PostTask(
287 FROM_HERE, base::Bind(
288 &ByteStreamOutputImpl::TransferData,
289 peer_lifetime_flag_,
290 peer_,
291 base::Passed(xfer_buffer.Pass()),
292 buffer_size,
293 false /* Source not complete. */,
294 content::DOWNLOAD_INTERRUPT_REASON_NONE,
295 additional_source_wait_time));
296 }
297 }
298
299 ByteStreamOutputImpl::ByteStreamOutputImpl(
300 scoped_refptr<base::SequencedTaskRunner> task_runner,
301 scoped_refptr<LifetimeFlag> lifetime_flag,
302 size_t buffer_size)
303 : total_buffer_size_(buffer_size),
304 my_task_runner_(task_runner),
305 my_lifetime_flag_(lifetime_flag),
306 available_contents_size_(0),
307 received_status_(false),
308 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE),
309 unreported_consumed_bytes_(0),
310 peer_(NULL),
311 bytes_read_(0),
312 buffers_read_(0),
313 sink_callbacks_triggered_(0),
314 source_callbacks_triggered_(0) {
315 DCHECK(my_lifetime_flag_.get());
316 my_lifetime_flag_->is_alive_ = true;
317 }
318
319 ByteStreamOutputImpl::~ByteStreamOutputImpl() {
320 my_lifetime_flag_->is_alive_ = false;
321 }
322
323 void ByteStreamOutputImpl::SetPeer(
324 ByteStreamInputImpl* peer,
325 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
326 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
327 peer_ = peer;
328 peer_task_runner_ = peer_task_runner;
329 peer_lifetime_flag_ = peer_lifetime_flag;
330 }
331
332 ByteStreamOutputImpl::StreamState
333 ByteStreamOutputImpl::GetData(scoped_refptr<net::IOBuffer>* data,
334 size_t* length) {
335 if (available_contents_.size()) {
336 *data = available_contents_.front().first;
337 *length = available_contents_.front().second;
338 available_contents_.pop_front();
339 DCHECK_GE(available_contents_size_, *length);
340 available_contents_size_ -= *length;
341 unreported_consumed_bytes_ += *length;
342 bytes_read_ += *length;
343 buffers_read_ ++;
344 if (available_contents_size_ <= total_buffer_size_ &&
345 available_contents_size_ + *length > total_buffer_size_) {
346 last_non_full_time_ = base::Time::Now();
347 }
348
349 MaybeUpdateInput();
350 return STREAM_HAS_DATA;
351 }
352 if (received_status_) {
353 return STREAM_COMPLETE;
354 }
355 return STREAM_EMPTY;
356 }
357
358 content::DownloadInterruptReason
359 ByteStreamOutputImpl::GetSourceResult() const {
360 DCHECK(received_status_);
361 return status_;
362 }
363
364 void ByteStreamOutputImpl::RegisterCallback(base::Closure sink_callback) {
365 data_available_callback_ = sink_callback;
366 }
367
368 size_t ByteStreamOutputImpl::NumSourceCallbacks() const {
369 return source_callbacks_triggered_;
370 }
371
372 size_t ByteStreamOutputImpl::NumSinkCallbacks() const {
373 return sink_callbacks_triggered_;
374 }
375
376 size_t ByteStreamOutputImpl::BytesRead() const {
377 return bytes_read_;
378 }
379
380 size_t ByteStreamOutputImpl::BuffersRead() const {
381 return buffers_read_;
382 }
383
384 base::TimeDelta ByteStreamOutputImpl::TotalSourceTriggerWaitTime() const {
385 return total_source_trigger_wait_time_;
386 }
387
388 base::TimeDelta ByteStreamOutputImpl::TotalSinkTriggerWaitTime() const {
389 return total_sink_trigger_wait_time_;
390 }
391
392 // static
393 void ByteStreamOutputImpl::TransferData(
394 scoped_refptr<LifetimeFlag> object_lifetime_flag,
395 ByteStreamOutputImpl* target,
396 scoped_ptr<ContentVector> xfer_buffer,
397 size_t buffer_size,
398 bool source_complete,
399 content::DownloadInterruptReason status,
400 base::TimeDelta additional_sink_trigger_wait_time) {
401 // If our target is no longer alive, do nothing.
402 if (!object_lifetime_flag->is_alive_) return;
403
404 if (xfer_buffer.get()) {
405 target->available_contents_.insert(target->available_contents_.end(),
406 xfer_buffer->begin(),
407 xfer_buffer->end());
408 target->available_contents_size_ += buffer_size;
409 }
410
411 if (source_complete) {
412 target->received_status_ = true;
413 target->status_ = status;
414 }
415
416 target->sink_callbacks_triggered_++;
417 target->total_sink_trigger_wait_time_ += additional_sink_trigger_wait_time;
418
419 if (!target->data_available_callback_.is_null())
420 target->data_available_callback_.Run();
421 }
422
423 // Decide whether or not to send the input a window update.
424 // Currently we do that whenever we've got unreported consumption
425 // greater than 1/3 of total size.
426 void ByteStreamOutputImpl::MaybeUpdateInput() {
427 if (unreported_consumed_bytes_ > total_buffer_size_ / 3) {
428 source_callbacks_triggered_++;
429 if (!last_non_full_time_.is_null()) {
430 total_source_trigger_wait_time_ +=
431 base::Time::Now() - last_non_full_time_;
432 last_non_full_time_ = base::Time();
433 }
434 peer_task_runner_->PostTask(
435 FROM_HERE, base::Bind(
436 &ByteStreamInputImpl::UpdateWindow,
437 peer_lifetime_flag_,
438 peer_,
439 unreported_consumed_bytes_));
440 unreported_consumed_bytes_ = 0;
441 }
442 }
443
444 } // namespace
445
446 namespace content {
447
448 ByteStreamOutput::~ByteStreamOutput() { }
449
450 ByteStreamInput::~ByteStreamInput() { }
451
452 void CreateByteStream(
453 scoped_ptr<ByteStreamInput>* input,
454 scoped_ptr<ByteStreamOutput>* output,
455 scoped_refptr<base::SequencedTaskRunner> input_task_runner,
456 scoped_refptr<base::SequencedTaskRunner> output_task_runner,
457 size_t buffer_size) {
458 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
459 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
460
461 ByteStreamInputImpl* in = new ByteStreamInputImpl(
462 input_task_runner, input_flag, buffer_size);
463 ByteStreamOutputImpl* out = new ByteStreamOutputImpl(
464 output_task_runner, output_flag, buffer_size);
465
466 in->SetPeer(out, output_task_runner, output_flag);
467 out->SetPeer(in, input_task_runner, input_flag);
468 input->reset(in);
469 output->reset(out);
470 return;
471 }
472
473 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698