Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(485)

Side by Side Diff: content/browser/download/byte_stream.cc

Issue 10244001: Creation of ByteStream class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Incorporated comments and fixed a few callback problems. Created 8 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/download/byte_stream.h"
6
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/memory/weak_ptr.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/sequenced_task_runner.h"
12
13 namespace {
14
15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>,size_t> >
benjhayden 2012/05/16 14:26:25 Space after comma.
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
16 ContentVector;
17
18 class ByteStreamOutputImpl;
19
20 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
21 // cleared in an object destructor and accessed to check for object
22 // existence. We can't use weak pointers because they're tightly tied to
23 // threads rather than task runners.
benjhayden 2012/05/16 14:26:25 TODO: TaskRunnerWeakPointer?
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
24 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
25 public:
26 LifetimeFlag() : is_alive_(true) { }
27 bool is_alive_;
benjhayden 2012/05/16 14:26:25 blank line between sections?
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
28 protected:
29 friend class base::RefCountedThreadSafe<LifetimeFlag>;
30 virtual ~LifetimeFlag() { }
31 };
benjhayden 2012/05/16 14:26:25 DISALLOW_COPY_AND_ASSIGN?
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
32
33 // For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and
34 // SetPeer may happen anywhere; all other operations on each class must
35 // happen in the context of their SequencedTaskRunner.
36 class ByteStreamInputImpl : public content::ByteStreamInput {
37 public:
38 ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
39 scoped_refptr<LifetimeFlag> lifetime_flag,
40 size_t buffer_size);
41 virtual ~ByteStreamInputImpl();
42
43 // Must be called before any operations are performed.
44 void SetPeer(ByteStreamOutputImpl* peer,
45 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
46 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
47
48 // Overridden from ByteStreamInput.
49 virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
50 size_t byte_count) OVERRIDE;
51 virtual void Close(content::DownloadInterruptReason status) OVERRIDE;
52 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
53
54 // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|.
55 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
56 ByteStreamInputImpl* target,
57 size_t bytes_consumed);
58
59 private:
60 void MaybePostToPeer();
61
62 const size_t total_buffer_size_;
63
64 // All data objects in this class are only valid to access on
65 // this task runner except as otherwise noted.
66 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
67
68 // True while this object is alive.
69 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
70
71 base::Closure space_available_callback_;
72 ContentVector input_contents_;
73 size_t input_contents_size_;
74
75 // ** Peer information.
76
77 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
78
79 // How much we've sent to the output that for flow control purposes we
80 // must assume hasn't been read yet.
81 size_t output_size_used_;
82
83 // Only valid to access on peer_task_runner_.
84 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
85
86 // Only valid to access on peer_task_runner_ if
87 // |*peer_lifetime_flag_ == true|
88 ByteStreamOutputImpl* peer_;
89 };
90
91 class ByteStreamOutputImpl : public content::ByteStreamOutput {
92 public:
93 ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
94 scoped_refptr<LifetimeFlag> lifetime_flag,
95 size_t buffer_size);
96 virtual ~ByteStreamOutputImpl();
97
98 // Must be called before any operations are performed.
99 void SetPeer(ByteStreamInputImpl* peer,
100 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
101 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
102
103 // Overridden from ByteStreamOutput.
104 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
105 size_t* length) OVERRIDE;
106 virtual content::DownloadInterruptReason GetStatus() const OVERRIDE;
107 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
108
109 // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and
110 // |ByteStreamInputImpl::Close|.
111 // Receive data from our peer.
112 // static because it may be called after the object it is targeting
113 // has been destroyed. It may not access |*target|
114 // if |*object_lifetime_flag| is false.
115 static void TransferData(
116 scoped_refptr<LifetimeFlag> object_lifetime_flag,
117 ByteStreamOutputImpl* target,
118 scoped_ptr<ContentVector> xfer_buffer,
119 size_t xfer_buffer_bytes,
120 bool source_complete,
121 content::DownloadInterruptReason status);
122
123 private:
124 void MaybeUpdateInput();
125
126 const size_t total_buffer_size_;
127
128 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
129
130 // True while this object is alive.
131 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
132
133 ContentVector available_contents_;
134 size_t available_contents_size_;
135
136 bool received_status_;
137 content::DownloadInterruptReason status_;
138
139 base::Closure data_available_callback_;
140
141 // Time of last point at which data in stream transitioned from full
142 // to non-full. Nulled when a callback is sent.
143 base::Time last_non_full_time_;
144
145 // ** Peer information
146
147 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
148
149 // How much has been removed from this class that we haven't told
150 // the input about yet.
151 size_t unreported_consumed_bytes_;
152
153 // Only valid to access on peer_task_runner_.
154 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
155
156 // Only valid to access on peer_task_runner_ if
157 // |*peer_lifetime_flag_ == true|
158 ByteStreamInputImpl* peer_;
159 };
160
161 ByteStreamInputImpl::ByteStreamInputImpl(
162 scoped_refptr<base::SequencedTaskRunner> task_runner,
163 scoped_refptr<LifetimeFlag> lifetime_flag,
164 size_t buffer_size)
165 : total_buffer_size_(buffer_size),
166 my_task_runner_(task_runner),
167 my_lifetime_flag_(lifetime_flag),
168 input_contents_size_(0),
169 output_size_used_(0),
170 peer_(NULL) {
171 DCHECK(my_lifetime_flag_.get());
172 my_lifetime_flag_->is_alive_ = true;
173 }
174
175 ByteStreamInputImpl::~ByteStreamInputImpl() {
176 my_lifetime_flag_->is_alive_ = false;
177 }
178
179 void ByteStreamInputImpl::SetPeer(
180 ByteStreamOutputImpl* peer,
181 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
182 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
183 peer_ = peer;
184 peer_task_runner_ = peer_task_runner;
185 peer_lifetime_flag_ = peer_lifetime_flag;
186 }
187
188 bool ByteStreamInputImpl::Write(
189 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
190 input_contents_.push_back(std::make_pair(buffer, byte_count));
191 input_contents_size_ += byte_count;
192
193 MaybePostToPeer();
194
195 return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
196 }
197
198 void ByteStreamInputImpl::Close(
benjhayden 2012/05/16 19:17:21 This seems to duplicate some of MaybePostToPeer().
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
199 content::DownloadInterruptReason status) {
200 // Make sure to flush everything we have with the source notification.
201 scoped_ptr<ContentVector> xfer_buffer;
202 size_t buffer_size = 0;
203 if (0 != input_contents_size_) {
204 xfer_buffer.reset(new ContentVector);
205 xfer_buffer->swap(input_contents_);
206 buffer_size = input_contents_size_;
207 output_size_used_ += input_contents_size_;
208 input_contents_size_ = 0;
209 }
210 peer_task_runner_->PostTask(
211 FROM_HERE, base::Bind(
212 &ByteStreamOutputImpl::TransferData,
213 peer_lifetime_flag_,
214 peer_,
215 base::Passed(xfer_buffer.Pass()),
216 buffer_size,
217 true /* Source complete. */,
218 status));
219 }
220
221 void ByteStreamInputImpl::RegisterCallback(
222 const base::Closure& source_callback) {
223 space_available_callback_ = source_callback;
224 }
225
226 // static
227 void ByteStreamInputImpl::UpdateWindow(
228 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target,
229 size_t bytes_consumed) {
230
benjhayden 2012/05/16 15:41:13 No blank lines at beginning/ends of blocks.
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
231 // If the target object isn't alive anymore, we do nothing.
232 if (!lifetime_flag->is_alive_) return;
233
234 DCHECK_GE(target->output_size_used_, bytes_consumed);
benjhayden 2012/05/16 15:41:13 Want to put the rest of this function in a method
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
235 target->output_size_used_ -= bytes_consumed;
236
237 // Callback if we were above the limit and we're now <= to it.
238 size_t total_known_size_used =
239 target->input_contents_size_ + target->output_size_used_;
240
241 if (total_known_size_used <= target->total_buffer_size_ &&
242 (total_known_size_used + bytes_consumed > target->total_buffer_size_) &&
243 !target->space_available_callback_.is_null())
244 target->space_available_callback_.Run();
245 }
246
247 // Decide whether or not we've bufferred enough for a transfer.
248 // For right now "enough" will be "anything".
249 void ByteStreamInputImpl::MaybePostToPeer() {
250 // Arbitrarily, we buffer to a third of the total size before sending.
benjhayden 2012/05/16 15:41:13 Does this contradict L248?
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Yep; oops. Fixed.
251 if (input_contents_size_ > total_buffer_size_ / 3) {
benjhayden 2012/05/16 15:41:13 Invert and early-return?
benjhayden 2012/05/16 15:41:13 Use a kConstant instead of 3?
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Obsolete because of refactor done for another comm
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done. I'm a bit hesitant because if I change this
252 scoped_ptr<ContentVector> xfer_buffer(new ContentVector);
253 xfer_buffer->swap(input_contents_);
254 size_t buffer_size = input_contents_size_;
255 output_size_used_ += input_contents_size_;
256 input_contents_size_ = 0;
257
258 peer_task_runner_->PostTask(
259 FROM_HERE, base::Bind(
260 &ByteStreamOutputImpl::TransferData,
261 peer_lifetime_flag_,
262 peer_,
263 base::Passed(xfer_buffer.Pass()),
264 buffer_size,
265 false /* Source not complete. */,
266 content::DOWNLOAD_INTERRUPT_REASON_NONE));
267 }
268 }
269
270 ByteStreamOutputImpl::ByteStreamOutputImpl(
271 scoped_refptr<base::SequencedTaskRunner> task_runner,
272 scoped_refptr<LifetimeFlag> lifetime_flag,
273 size_t buffer_size)
274 : total_buffer_size_(buffer_size),
275 my_task_runner_(task_runner),
276 my_lifetime_flag_(lifetime_flag),
277 available_contents_size_(0),
278 received_status_(false),
279 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE),
280 unreported_consumed_bytes_(0),
281 peer_(NULL) {
282 DCHECK(my_lifetime_flag_.get());
283 my_lifetime_flag_->is_alive_ = true;
284 }
285
286 ByteStreamOutputImpl::~ByteStreamOutputImpl() {
287 my_lifetime_flag_->is_alive_ = false;
288 }
289
290 void ByteStreamOutputImpl::SetPeer(
291 ByteStreamInputImpl* peer,
292 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
293 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
294 peer_ = peer;
295 peer_task_runner_ = peer_task_runner;
296 peer_lifetime_flag_ = peer_lifetime_flag;
297 }
298
299 ByteStreamOutputImpl::StreamState
300 ByteStreamOutputImpl::Read(scoped_refptr<net::IOBuffer>* data,
301 size_t* length) {
302 if (available_contents_.size()) {
303 *data = available_contents_.front().first;
304 *length = available_contents_.front().second;
305 available_contents_.pop_front();
306 DCHECK_GE(available_contents_size_, *length);
307 available_contents_size_ -= *length;
308 unreported_consumed_bytes_ += *length;
309
310 MaybeUpdateInput();
311 return STREAM_HAS_DATA;
312 }
313 if (received_status_) {
314 return STREAM_COMPLETE;
315 }
316 return STREAM_EMPTY;
317 }
318
319 content::DownloadInterruptReason
320 ByteStreamOutputImpl::GetStatus() const {
321 DCHECK(received_status_);
322 return status_;
323 }
324
325 void ByteStreamOutputImpl::RegisterCallback(
326 const base::Closure& sink_callback) {
327 data_available_callback_ = sink_callback;
328 }
329
330 // static
331 void ByteStreamOutputImpl::TransferData(
332 scoped_refptr<LifetimeFlag> object_lifetime_flag,
333 ByteStreamOutputImpl* target,
334 scoped_ptr<ContentVector> xfer_buffer,
335 size_t buffer_size,
336 bool source_complete,
337 content::DownloadInterruptReason status) {
338 // If our target is no longer alive, do nothing.
339 if (!object_lifetime_flag->is_alive_) return;
340
341 if (xfer_buffer.get()) {
342 target->available_contents_.insert(target->available_contents_.end(),
343 xfer_buffer->begin(),
344 xfer_buffer->end());
345 target->available_contents_size_ += buffer_size;
346 }
347
348 if (source_complete) {
349 target->received_status_ = true;
350 target->status_ = status;
351 }
352
353 // Callback if we didn't use to have data and we do now.
354 if (target->available_contents_size_ == buffer_size &&
benjhayden 2012/05/16 15:41:13 Looks like you can get rid of available_contents_s
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 How do you figure? I agree available_contents_siz
355 target->available_contents_size_ > 0 &&
356 !target->data_available_callback_.is_null())
357 target->data_available_callback_.Run();
358 }
359
360 // Decide whether or not to send the input a window update.
361 // Currently we do that whenever we've got unreported consumption
362 // greater than 1/3 of total size.
363 void ByteStreamOutputImpl::MaybeUpdateInput() {
364 if (unreported_consumed_bytes_ > total_buffer_size_ / 3) {
benjhayden 2012/05/16 19:17:21 Invert and early-return?
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 Done.
365 peer_task_runner_->PostTask(
366 FROM_HERE, base::Bind(
367 &ByteStreamInputImpl::UpdateWindow,
368 peer_lifetime_flag_,
369 peer_,
370 unreported_consumed_bytes_));
371 unreported_consumed_bytes_ = 0;
372 }
373 }
374
375 } // namespace
376
377 namespace content {
378
379 ByteStreamOutput::~ByteStreamOutput() { }
380
381 ByteStreamInput::~ByteStreamInput() { }
382
383 void CreateByteStream(
384 scoped_ptr<ByteStreamInput>* input,
385 scoped_ptr<ByteStreamOutput>* output,
386 scoped_refptr<base::SequencedTaskRunner> input_task_runner,
387 scoped_refptr<base::SequencedTaskRunner> output_task_runner,
388 size_t buffer_size) {
389 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
390 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
391
392 ByteStreamInputImpl* in = new ByteStreamInputImpl(
393 input_task_runner, input_flag, buffer_size);
394 ByteStreamOutputImpl* out = new ByteStreamOutputImpl(
395 output_task_runner, output_flag, buffer_size);
396
397 in->SetPeer(out, output_task_runner, output_flag);
398 out->SetPeer(in, input_task_runner, input_flag);
399 input->reset(in);
400 output->reset(out);
401 return;
benjhayden 2012/05/16 15:41:13 Feeling philosophical?
Randy Smith (Not in Mondays) 2012/05/16 20:30:15 You know, it took me a good fifteen seconds to fig
402 }
403
404 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698