Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: content/browser/download/byte_stream.cc

Issue 10244001: Creation of ByteStream class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fixed bug with zero length writes. Created 8 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/download/byte_stream.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/memory/weak_ptr.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/sequenced_task_runner.h"
12
13 namespace {
14
15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
16 ContentVector;
17
18 // The fraction of the buffer that must be ready to send on the input
19 // before we ship data to the output.
20 static const int kFractionBufferBeforeSending = 3;
21
22 // The fraction of the buffer that must have been consumed on the output
23 // before we update the input window.
24 static const int kFractionReadBeforeWindowUpdate = 3;
25
26 class ByteStreamOutputImpl;
27
28 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
29 // cleared in an object destructor and accessed to check for object
30 // existence. We can't use weak pointers because they're tightly tied to
31 // threads rather than task runners.
32 // TODO(rdsmith): A better solution would be extending weak pointers
33 // to support SequencedTaskRunners.
34 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
35 public:
36 LifetimeFlag() : is_alive_(true) { }
37 bool is_alive_;
38
39 protected:
40 friend class base::RefCountedThreadSafe<LifetimeFlag>;
41 virtual ~LifetimeFlag() { }
42
43 private:
44 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
45 };
46
47 // For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and
48 // SetPeer may happen anywhere; all other operations on each class must
49 // happen in the context of their SequencedTaskRunner.
benjhayden 2012/05/18 14:21:06 Is there any way to DCHECK this? I find that kind
Randy Smith (Not in Mondays) 2012/05/18 21:55:15 Agreed. Done.
50 class ByteStreamInputImpl : public content::ByteStreamInput {
51 public:
52 ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
53 scoped_refptr<LifetimeFlag> lifetime_flag,
54 size_t buffer_size);
55 virtual ~ByteStreamInputImpl();
56
57 // Must be called before any operations are performed.
58 void SetPeer(ByteStreamOutputImpl* peer,
59 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
60 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
61
62 // Overridden from ByteStreamInput.
63 virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
64 size_t byte_count) OVERRIDE;
65 virtual void Close(content::DownloadInterruptReason status) OVERRIDE;
66 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
67
68 // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|.
69 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
70 ByteStreamInputImpl* target,
71 size_t bytes_consumed);
72
73 private:
74 // Called from UpdateWindow when object existence has been validated.
75 void UpdateWindowInternal(size_t bytes_consumed);
76
77 void PostToPeer(bool complete, content::DownloadInterruptReason status);
78
79 const size_t total_buffer_size_;
80
81 // All data objects in this class are only valid to access on
82 // this task runner except as otherwise noted.
83 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
84
85 // True while this object is alive.
86 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
87
88 base::Closure space_available_callback_;
89 ContentVector input_contents_;
90 size_t input_contents_size_;
91
92 // ** Peer information.
93
94 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
95
96 // How much we've sent to the output that for flow control purposes we
97 // must assume hasn't been read yet.
98 size_t output_size_used_;
99
100 // Only valid to access on peer_task_runner_.
101 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
102
103 // Only valid to access on peer_task_runner_ if
104 // |*peer_lifetime_flag_ == true|
105 ByteStreamOutputImpl* peer_;
106 };
107
108 class ByteStreamOutputImpl : public content::ByteStreamOutput {
109 public:
110 ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
111 scoped_refptr<LifetimeFlag> lifetime_flag,
112 size_t buffer_size);
113 virtual ~ByteStreamOutputImpl();
114
115 // Must be called before any operations are performed.
116 void SetPeer(ByteStreamInputImpl* peer,
117 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
118 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
119
120 // Overridden from ByteStreamOutput.
121 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
122 size_t* length) OVERRIDE;
123 virtual content::DownloadInterruptReason GetStatus() const OVERRIDE;
124 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
125
126 // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and
127 // |ByteStreamInputImpl::Close|.
128 // Receive data from our peer.
129 // static because it may be called after the object it is targeting
130 // has been destroyed. It may not access |*target|
131 // if |*object_lifetime_flag| is false.
132 static void TransferData(
133 scoped_refptr<LifetimeFlag> object_lifetime_flag,
134 ByteStreamOutputImpl* target,
135 scoped_ptr<ContentVector> transfer_buffer,
136 size_t transfer_buffer_bytes,
137 bool source_complete,
138 content::DownloadInterruptReason status);
139
140 private:
141 // Called from TransferData once object existence has been validated.
142 void TransferDataInternal(
143 scoped_ptr<ContentVector> transfer_buffer,
144 size_t transfer_buffer_bytes,
145 bool source_complete,
146 content::DownloadInterruptReason status);
147
148 void MaybeUpdateInput();
149
150 const size_t total_buffer_size_;
151
152 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
153
154 // True while this object is alive.
155 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
156
157 ContentVector available_contents_;
158 size_t available_contents_size_;
159
160 bool received_status_;
161 content::DownloadInterruptReason status_;
162
163 base::Closure data_available_callback_;
164
165 // Time of last point at which data in stream transitioned from full
166 // to non-full. Nulled when a callback is sent.
167 base::Time last_non_full_time_;
168
169 // ** Peer information
170
171 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
172
173 // How much has been removed from this class that we haven't told
174 // the input about yet.
175 size_t unreported_consumed_bytes_;
176
177 // Only valid to access on peer_task_runner_.
178 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
179
180 // Only valid to access on peer_task_runner_ if
181 // |*peer_lifetime_flag_ == true|
182 ByteStreamInputImpl* peer_;
183 };
184
185 ByteStreamInputImpl::ByteStreamInputImpl(
186 scoped_refptr<base::SequencedTaskRunner> task_runner,
187 scoped_refptr<LifetimeFlag> lifetime_flag,
188 size_t buffer_size)
189 : total_buffer_size_(buffer_size),
190 my_task_runner_(task_runner),
191 my_lifetime_flag_(lifetime_flag),
192 input_contents_size_(0),
193 output_size_used_(0),
194 peer_(NULL) {
195 DCHECK(my_lifetime_flag_.get());
196 my_lifetime_flag_->is_alive_ = true;
197 }
198
199 ByteStreamInputImpl::~ByteStreamInputImpl() {
200 my_lifetime_flag_->is_alive_ = false;
201 }
202
203 void ByteStreamInputImpl::SetPeer(
204 ByteStreamOutputImpl* peer,
205 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
206 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
207 peer_ = peer;
208 peer_task_runner_ = peer_task_runner;
209 peer_lifetime_flag_ = peer_lifetime_flag;
210 }
211
212 bool ByteStreamInputImpl::Write(
213 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
214 input_contents_.push_back(std::make_pair(buffer, byte_count));
215 input_contents_size_ += byte_count;
216
217 // Arbitrarily, we buffer to a third of the total size before sending.
218 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
219 PostToPeer(false, content::DOWNLOAD_INTERRUPT_REASON_NONE);
220
221 return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
222 }
223
224 void ByteStreamInputImpl::Close(
225 content::DownloadInterruptReason status) {
226 PostToPeer(true, status);
227 }
228
229 void ByteStreamInputImpl::RegisterCallback(
230 const base::Closure& source_callback) {
231 space_available_callback_ = source_callback;
232 }
233
234 // static
235 void ByteStreamInputImpl::UpdateWindow(
236 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target,
237 size_t bytes_consumed) {
238 // If the target object isn't alive anymore, we do nothing.
239 if (!lifetime_flag->is_alive_) return;
240
241 target->UpdateWindowInternal(bytes_consumed);
242 }
243
244 void ByteStreamInputImpl::UpdateWindowInternal(size_t bytes_consumed) {
245 DCHECK_GE(output_size_used_, bytes_consumed);
246 output_size_used_ -= bytes_consumed;
247
248 // Callback if we were above the limit and we're now <= to it.
249 size_t total_known_size_used =
250 input_contents_size_ + output_size_used_;
251
252 if (total_known_size_used <= total_buffer_size_ &&
253 (total_known_size_used + bytes_consumed > total_buffer_size_) &&
254 !space_available_callback_.is_null())
255 space_available_callback_.Run();
256 }
257
258 void ByteStreamInputImpl::PostToPeer(
259 bool complete, content::DownloadInterruptReason status) {
260 // Valid contexts in which to call.
261 DCHECK(complete || 0 != input_contents_size_);
262
263 scoped_ptr<ContentVector> transfer_buffer(new ContentVector);
264 size_t buffer_size = 0;
265 if (0 != input_contents_size_) {
266 transfer_buffer.reset(new ContentVector);
267 transfer_buffer->swap(input_contents_);
268 buffer_size = input_contents_size_;
269 output_size_used_ += input_contents_size_;
270 input_contents_size_ = 0;
271 }
272 peer_task_runner_->PostTask(
273 FROM_HERE, base::Bind(
274 &ByteStreamOutputImpl::TransferData,
275 peer_lifetime_flag_,
276 peer_,
277 base::Passed(transfer_buffer.Pass()),
278 buffer_size,
279 complete,
280 status));
281 }
282
283 ByteStreamOutputImpl::ByteStreamOutputImpl(
284 scoped_refptr<base::SequencedTaskRunner> task_runner,
285 scoped_refptr<LifetimeFlag> lifetime_flag,
286 size_t buffer_size)
287 : total_buffer_size_(buffer_size),
288 my_task_runner_(task_runner),
289 my_lifetime_flag_(lifetime_flag),
290 available_contents_size_(0),
291 received_status_(false),
292 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE),
293 unreported_consumed_bytes_(0),
294 peer_(NULL) {
295 DCHECK(my_lifetime_flag_.get());
296 my_lifetime_flag_->is_alive_ = true;
297 }
298
299 ByteStreamOutputImpl::~ByteStreamOutputImpl() {
300 my_lifetime_flag_->is_alive_ = false;
301 }
302
303 void ByteStreamOutputImpl::SetPeer(
304 ByteStreamInputImpl* peer,
305 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
306 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
307 peer_ = peer;
308 peer_task_runner_ = peer_task_runner;
309 peer_lifetime_flag_ = peer_lifetime_flag;
310 }
311
312 ByteStreamOutputImpl::StreamState
313 ByteStreamOutputImpl::Read(scoped_refptr<net::IOBuffer>* data,
314 size_t* length) {
315 if (available_contents_.size()) {
316 *data = available_contents_.front().first;
317 *length = available_contents_.front().second;
318 available_contents_.pop_front();
319 DCHECK_GE(available_contents_size_, *length);
benjhayden 2012/05/18 14:21:06 Did you mean to delete |available_contents_size_|,
Randy Smith (Not in Mondays) 2012/05/18 21:55:15 Good point--deleted.
320 available_contents_size_ -= *length;
321 unreported_consumed_bytes_ += *length;
322
323 MaybeUpdateInput();
324 return STREAM_HAS_DATA;
325 }
326 if (received_status_) {
327 return STREAM_COMPLETE;
328 }
329 return STREAM_EMPTY;
330 }
331
332 content::DownloadInterruptReason
333 ByteStreamOutputImpl::GetStatus() const {
334 DCHECK(received_status_);
335 return status_;
336 }
337
338 void ByteStreamOutputImpl::RegisterCallback(
339 const base::Closure& sink_callback) {
340 data_available_callback_ = sink_callback;
341 }
342
343 // static
344 void ByteStreamOutputImpl::TransferData(
345 scoped_refptr<LifetimeFlag> object_lifetime_flag,
346 ByteStreamOutputImpl* target,
347 scoped_ptr<ContentVector> transfer_buffer,
348 size_t buffer_size,
349 bool source_complete,
350 content::DownloadInterruptReason status) {
351 // If our target is no longer alive, do nothing.
352 if (!object_lifetime_flag->is_alive_) return;
353
354 target->TransferDataInternal(
355 transfer_buffer.Pass(), buffer_size, source_complete, status);
356 }
357
358 void ByteStreamOutputImpl::TransferDataInternal(
359 scoped_ptr<ContentVector> transfer_buffer,
360 size_t buffer_size,
361 bool source_complete,
362 content::DownloadInterruptReason status) {
363 bool was_empty = available_contents_.empty();
364
365 if (transfer_buffer.get()) {
366 available_contents_.insert(available_contents_.end(),
367 transfer_buffer->begin(),
368 transfer_buffer->end());
369 available_contents_size_ += buffer_size;
370 }
371
372 if (source_complete) {
373 received_status_ = true;
374 status_ = status;
375 }
376
377 // Callback on transition from empty to non-empty, or
378 // source complete.
379 if (((was_empty && !available_contents_.empty()) ||
380 source_complete) &&
381 !data_available_callback_.is_null())
382 data_available_callback_.Run();
383 }
384
385 // Decide whether or not to send the input a window update.
386 // Currently we do that whenever we've got unreported consumption
387 // greater than 1/3 of total size.
388 void ByteStreamOutputImpl::MaybeUpdateInput() {
389 if (unreported_consumed_bytes_ <=
390 total_buffer_size_ / kFractionReadBeforeWindowUpdate)
391 return;
392
393 peer_task_runner_->PostTask(
394 FROM_HERE, base::Bind(
395 &ByteStreamInputImpl::UpdateWindow,
396 peer_lifetime_flag_,
397 peer_,
398 unreported_consumed_bytes_));
399 unreported_consumed_bytes_ = 0;
400 }
401
402 } // namespace
403
404 namespace content {
405
406 ByteStreamOutput::~ByteStreamOutput() { }
407
408 ByteStreamInput::~ByteStreamInput() { }
409
410 void CreateByteStream(
411 scoped_refptr<base::SequencedTaskRunner> input_task_runner,
412 scoped_refptr<base::SequencedTaskRunner> output_task_runner,
413 size_t buffer_size,
414 scoped_ptr<ByteStreamInput>* input,
415 scoped_ptr<ByteStreamOutput>* output) {
416 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
417 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
418
419 ByteStreamInputImpl* in = new ByteStreamInputImpl(
420 input_task_runner, input_flag, buffer_size);
421 ByteStreamOutputImpl* out = new ByteStreamOutputImpl(
422 output_task_runner, output_flag, buffer_size);
423
424 in->SetPeer(out, output_task_runner, output_flag);
425 out->SetPeer(in, input_task_runner, input_flag);
426 input->reset(in);
427 output->reset(out);
428 }
429
430 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698