Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(267)

Side by Side Diff: content/browser/download/byte_stream.cc

Issue 10392111: Use ByteStream in downloads system to decouple source and sink. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Sync'd to LKGR. Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/download/byte_stream.h" 5 #include "content/browser/download/byte_stream.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/memory/weak_ptr.h" 9 #include "base/memory/weak_ptr.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
11 #include "base/sequenced_task_runner.h" 11 #include "base/sequenced_task_runner.h"
12 12
13 namespace { 13 namespace {
14 14
15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > 15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
16 ContentVector; 16 ContentVector;
17 17
18 // The fraction of the buffer that must be ready to send on the input 18 // The fraction of the buffer that must be ready to send on the input
19 // before we ship data to the output. 19 // before we ship data to the output.
20 static const int kFractionBufferBeforeSending = 3; 20 static const int kFractionBufferBeforeSending = 3;
21 21
22 // The fraction of the buffer that must have been consumed on the output 22 // The fraction of the buffer that must have been consumed on the output
23 // before we update the input window. 23 // before we update the input window.
24 static const int kFractionReadBeforeWindowUpdate = 3; 24 static const int kFractionReadBeforeWindowUpdate = 3;
25 25
26 class ByteStreamOutputImpl; 26 class ByteStreamReaderImpl;
27 27
28 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be 28 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
29 // cleared in an object destructor and accessed to check for object 29 // cleared in an object destructor and accessed to check for object
30 // existence. We can't use weak pointers because they're tightly tied to 30 // existence. We can't use weak pointers because they're tightly tied to
31 // threads rather than task runners. 31 // threads rather than task runners.
32 // TODO(rdsmith): A better solution would be extending weak pointers 32 // TODO(rdsmith): A better solution would be extending weak pointers
33 // to support SequencedTaskRunners. 33 // to support SequencedTaskRunners.
34 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { 34 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
35 public: 35 public:
36 LifetimeFlag() : is_alive_(true) { } 36 LifetimeFlag() : is_alive_(true) { }
37 bool is_alive_; 37 bool is_alive_;
38 38
39 protected: 39 protected:
40 friend class base::RefCountedThreadSafe<LifetimeFlag>; 40 friend class base::RefCountedThreadSafe<LifetimeFlag>;
41 virtual ~LifetimeFlag() { } 41 virtual ~LifetimeFlag() { }
42 42
43 private: 43 private:
44 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag); 44 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
45 }; 45 };
46 46
47 // For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and 47 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
48 // SetPeer may happen anywhere; all other operations on each class must 48 // SetPeer may happen anywhere; all other operations on each class must
49 // happen in the context of their SequencedTaskRunner. 49 // happen in the context of their SequencedTaskRunner.
50 class ByteStreamInputImpl : public content::ByteStreamInput { 50 class ByteStreamWriterImpl : public content::ByteStreamWriter {
51 public: 51 public:
52 ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 52 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
53 scoped_refptr<LifetimeFlag> lifetime_flag, 53 scoped_refptr<LifetimeFlag> lifetime_flag,
54 size_t buffer_size); 54 size_t buffer_size);
55 virtual ~ByteStreamInputImpl(); 55 virtual ~ByteStreamWriterImpl();
56 56
57 // Must be called before any operations are performed. 57 // Must be called before any operations are performed.
58 void SetPeer(ByteStreamOutputImpl* peer, 58 void SetPeer(ByteStreamReaderImpl* peer,
59 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 59 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
60 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 60 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
61 61
62 // Overridden from ByteStreamInput. 62 // Overridden from ByteStreamWriter.
63 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, 63 virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
64 size_t byte_count) OVERRIDE; 64 size_t byte_count) OVERRIDE;
65 virtual void Close(content::DownloadInterruptReason status) OVERRIDE; 65 virtual void Close(content::DownloadInterruptReason status) OVERRIDE;
66 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; 66 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
67 67
68 // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|. 68 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
69 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, 69 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
70 ByteStreamInputImpl* target, 70 ByteStreamWriterImpl* target,
71 size_t bytes_consumed); 71 size_t bytes_consumed);
72 72
73 private: 73 private:
74 // Called from UpdateWindow when object existence has been validated. 74 // Called from UpdateWindow when object existence has been validated.
75 void UpdateWindowInternal(size_t bytes_consumed); 75 void UpdateWindowInternal(size_t bytes_consumed);
76 76
77 void PostToPeer(bool complete, content::DownloadInterruptReason status); 77 void PostToPeer(bool complete, content::DownloadInterruptReason status);
78 78
79 const size_t total_buffer_size_; 79 const size_t total_buffer_size_;
80 80
(...skipping 14 matching lines...) Expand all
95 95
96 // How much we've sent to the output that for flow control purposes we 96 // How much we've sent to the output that for flow control purposes we
97 // must assume hasn't been read yet. 97 // must assume hasn't been read yet.
98 size_t output_size_used_; 98 size_t output_size_used_;
99 99
100 // Only valid to access on peer_task_runner_. 100 // Only valid to access on peer_task_runner_.
101 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 101 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
102 102
103 // Only valid to access on peer_task_runner_ if 103 // Only valid to access on peer_task_runner_ if
104 // |*peer_lifetime_flag_ == true| 104 // |*peer_lifetime_flag_ == true|
105 ByteStreamOutputImpl* peer_; 105 ByteStreamReaderImpl* peer_;
106 }; 106 };
107 107
108 class ByteStreamOutputImpl : public content::ByteStreamOutput { 108 class ByteStreamReaderImpl : public content::ByteStreamReader {
109 public: 109 public:
110 ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, 110 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
111 scoped_refptr<LifetimeFlag> lifetime_flag, 111 scoped_refptr<LifetimeFlag> lifetime_flag,
112 size_t buffer_size); 112 size_t buffer_size);
113 virtual ~ByteStreamOutputImpl(); 113 virtual ~ByteStreamReaderImpl();
114 114
115 // Must be called before any operations are performed. 115 // Must be called before any operations are performed.
116 void SetPeer(ByteStreamInputImpl* peer, 116 void SetPeer(ByteStreamWriterImpl* peer,
117 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 117 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
118 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 118 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
119 119
120 // Overridden from ByteStreamOutput. 120 // Overridden from ByteStreamReader.
121 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, 121 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
122 size_t* length) OVERRIDE; 122 size_t* length) OVERRIDE;
123 virtual content::DownloadInterruptReason GetStatus() const OVERRIDE; 123 virtual content::DownloadInterruptReason GetStatus() const OVERRIDE;
124 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; 124 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
125 125
126 // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and 126 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and
127 // |ByteStreamInputImpl::Close|. 127 // |ByteStreamWriterImpl::Close|.
128 // Receive data from our peer. 128 // Receive data from our peer.
129 // static because it may be called after the object it is targeting 129 // static because it may be called after the object it is targeting
130 // has been destroyed. It may not access |*target| 130 // has been destroyed. It may not access |*target|
131 // if |*object_lifetime_flag| is false. 131 // if |*object_lifetime_flag| is false.
132 static void TransferData( 132 static void TransferData(
133 scoped_refptr<LifetimeFlag> object_lifetime_flag, 133 scoped_refptr<LifetimeFlag> object_lifetime_flag,
134 ByteStreamOutputImpl* target, 134 ByteStreamReaderImpl* target,
135 scoped_ptr<ContentVector> transfer_buffer, 135 scoped_ptr<ContentVector> transfer_buffer,
136 size_t transfer_buffer_bytes, 136 size_t transfer_buffer_bytes,
137 bool source_complete, 137 bool source_complete,
138 content::DownloadInterruptReason status); 138 content::DownloadInterruptReason status);
139 139
140 private: 140 private:
141 // Called from TransferData once object existence has been validated. 141 // Called from TransferData once object existence has been validated.
142 void TransferDataInternal( 142 void TransferDataInternal(
143 scoped_ptr<ContentVector> transfer_buffer, 143 scoped_ptr<ContentVector> transfer_buffer,
144 size_t transfer_buffer_bytes, 144 size_t transfer_buffer_bytes,
(...skipping 26 matching lines...) Expand all
171 171
172 // How much has been removed from this class that we haven't told 172 // How much has been removed from this class that we haven't told
173 // the input about yet. 173 // the input about yet.
174 size_t unreported_consumed_bytes_; 174 size_t unreported_consumed_bytes_;
175 175
176 // Only valid to access on peer_task_runner_. 176 // Only valid to access on peer_task_runner_.
177 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; 177 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
178 178
179 // Only valid to access on peer_task_runner_ if 179 // Only valid to access on peer_task_runner_ if
180 // |*peer_lifetime_flag_ == true| 180 // |*peer_lifetime_flag_ == true|
181 ByteStreamInputImpl* peer_; 181 ByteStreamWriterImpl* peer_;
182 }; 182 };
183 183
184 ByteStreamInputImpl::ByteStreamInputImpl( 184 ByteStreamWriterImpl::ByteStreamWriterImpl(
185 scoped_refptr<base::SequencedTaskRunner> task_runner, 185 scoped_refptr<base::SequencedTaskRunner> task_runner,
186 scoped_refptr<LifetimeFlag> lifetime_flag, 186 scoped_refptr<LifetimeFlag> lifetime_flag,
187 size_t buffer_size) 187 size_t buffer_size)
188 : total_buffer_size_(buffer_size), 188 : total_buffer_size_(buffer_size),
189 my_task_runner_(task_runner), 189 my_task_runner_(task_runner),
190 my_lifetime_flag_(lifetime_flag), 190 my_lifetime_flag_(lifetime_flag),
191 input_contents_size_(0), 191 input_contents_size_(0),
192 output_size_used_(0), 192 output_size_used_(0),
193 peer_(NULL) { 193 peer_(NULL) {
194 DCHECK(my_lifetime_flag_.get()); 194 DCHECK(my_lifetime_flag_.get());
195 my_lifetime_flag_->is_alive_ = true; 195 my_lifetime_flag_->is_alive_ = true;
196 } 196 }
197 197
198 ByteStreamInputImpl::~ByteStreamInputImpl() { 198 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
199 my_lifetime_flag_->is_alive_ = false; 199 my_lifetime_flag_->is_alive_ = false;
200 } 200 }
201 201
202 void ByteStreamInputImpl::SetPeer( 202 void ByteStreamWriterImpl::SetPeer(
203 ByteStreamOutputImpl* peer, 203 ByteStreamReaderImpl* peer,
204 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 204 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
205 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 205 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
206 peer_ = peer; 206 peer_ = peer;
207 peer_task_runner_ = peer_task_runner; 207 peer_task_runner_ = peer_task_runner;
208 peer_lifetime_flag_ = peer_lifetime_flag; 208 peer_lifetime_flag_ = peer_lifetime_flag;
209 } 209 }
210 210
211 bool ByteStreamInputImpl::Write( 211 bool ByteStreamWriterImpl::Write(
212 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { 212 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
213 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 213 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
214 214
215 input_contents_.push_back(std::make_pair(buffer, byte_count)); 215 input_contents_.push_back(std::make_pair(buffer, byte_count));
216 input_contents_size_ += byte_count; 216 input_contents_size_ += byte_count;
217 217
218 // Arbitrarily, we buffer to a third of the total size before sending. 218 // Arbitrarily, we buffer to a third of the total size before sending.
219 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) 219 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
220 PostToPeer(false, content::DOWNLOAD_INTERRUPT_REASON_NONE); 220 PostToPeer(false, content::DOWNLOAD_INTERRUPT_REASON_NONE);
221 221
222 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); 222 return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
223 } 223 }
224 224
225 void ByteStreamInputImpl::Close( 225 void ByteStreamWriterImpl::Close(
226 content::DownloadInterruptReason status) { 226 content::DownloadInterruptReason status) {
227 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 227 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
228 PostToPeer(true, status); 228 PostToPeer(true, status);
229 } 229 }
230 230
231 void ByteStreamInputImpl::RegisterCallback( 231 void ByteStreamWriterImpl::RegisterCallback(
232 const base::Closure& source_callback) { 232 const base::Closure& source_callback) {
233 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 233 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
234 space_available_callback_ = source_callback; 234 space_available_callback_ = source_callback;
235 } 235 }
236 236
237 // static 237 // static
238 void ByteStreamInputImpl::UpdateWindow( 238 void ByteStreamWriterImpl::UpdateWindow(
239 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target, 239 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
240 size_t bytes_consumed) { 240 size_t bytes_consumed) {
241 // If the target object isn't alive anymore, we do nothing. 241 // If the target object isn't alive anymore, we do nothing.
242 if (!lifetime_flag->is_alive_) return; 242 if (!lifetime_flag->is_alive_) return;
243 243
244 target->UpdateWindowInternal(bytes_consumed); 244 target->UpdateWindowInternal(bytes_consumed);
245 } 245 }
246 246
247 void ByteStreamInputImpl::UpdateWindowInternal(size_t bytes_consumed) { 247 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
248 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 248 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
249 DCHECK_GE(output_size_used_, bytes_consumed); 249 DCHECK_GE(output_size_used_, bytes_consumed);
250 output_size_used_ -= bytes_consumed; 250 output_size_used_ -= bytes_consumed;
251 251
252 // Callback if we were above the limit and we're now <= to it. 252 // Callback if we were above the limit and we're now <= to it.
253 size_t total_known_size_used = 253 size_t total_known_size_used =
254 input_contents_size_ + output_size_used_; 254 input_contents_size_ + output_size_used_;
255 255
256 if (total_known_size_used <= total_buffer_size_ && 256 if (total_known_size_used <= total_buffer_size_ &&
257 (total_known_size_used + bytes_consumed > total_buffer_size_) && 257 (total_known_size_used + bytes_consumed > total_buffer_size_) &&
258 !space_available_callback_.is_null()) 258 !space_available_callback_.is_null())
259 space_available_callback_.Run(); 259 space_available_callback_.Run();
260 } 260 }
261 261
262 void ByteStreamInputImpl::PostToPeer( 262 void ByteStreamWriterImpl::PostToPeer(
263 bool complete, content::DownloadInterruptReason status) { 263 bool complete, content::DownloadInterruptReason status) {
264 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 264 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
265 // Valid contexts in which to call. 265 // Valid contexts in which to call.
266 DCHECK(complete || 0 != input_contents_size_); 266 DCHECK(complete || 0 != input_contents_size_);
267 267
268 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); 268 scoped_ptr<ContentVector> transfer_buffer(new ContentVector);
269 size_t buffer_size = 0; 269 size_t buffer_size = 0;
270 if (0 != input_contents_size_) { 270 if (0 != input_contents_size_) {
271 transfer_buffer.reset(new ContentVector); 271 transfer_buffer.reset(new ContentVector);
272 transfer_buffer->swap(input_contents_); 272 transfer_buffer->swap(input_contents_);
273 buffer_size = input_contents_size_; 273 buffer_size = input_contents_size_;
274 output_size_used_ += input_contents_size_; 274 output_size_used_ += input_contents_size_;
275 input_contents_size_ = 0; 275 input_contents_size_ = 0;
276 } 276 }
277 peer_task_runner_->PostTask( 277 peer_task_runner_->PostTask(
278 FROM_HERE, base::Bind( 278 FROM_HERE, base::Bind(
279 &ByteStreamOutputImpl::TransferData, 279 &ByteStreamReaderImpl::TransferData,
280 peer_lifetime_flag_, 280 peer_lifetime_flag_,
281 peer_, 281 peer_,
282 base::Passed(transfer_buffer.Pass()), 282 base::Passed(transfer_buffer.Pass()),
283 buffer_size, 283 buffer_size,
284 complete, 284 complete,
285 status)); 285 status));
286 } 286 }
287 287
288 ByteStreamOutputImpl::ByteStreamOutputImpl( 288 ByteStreamReaderImpl::ByteStreamReaderImpl(
289 scoped_refptr<base::SequencedTaskRunner> task_runner, 289 scoped_refptr<base::SequencedTaskRunner> task_runner,
290 scoped_refptr<LifetimeFlag> lifetime_flag, 290 scoped_refptr<LifetimeFlag> lifetime_flag,
291 size_t buffer_size) 291 size_t buffer_size)
292 : total_buffer_size_(buffer_size), 292 : total_buffer_size_(buffer_size),
293 my_task_runner_(task_runner), 293 my_task_runner_(task_runner),
294 my_lifetime_flag_(lifetime_flag), 294 my_lifetime_flag_(lifetime_flag),
295 received_status_(false), 295 received_status_(false),
296 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE), 296 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE),
297 unreported_consumed_bytes_(0), 297 unreported_consumed_bytes_(0),
298 peer_(NULL) { 298 peer_(NULL) {
299 DCHECK(my_lifetime_flag_.get()); 299 DCHECK(my_lifetime_flag_.get());
300 my_lifetime_flag_->is_alive_ = true; 300 my_lifetime_flag_->is_alive_ = true;
301 } 301 }
302 302
303 ByteStreamOutputImpl::~ByteStreamOutputImpl() { 303 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
304 my_lifetime_flag_->is_alive_ = false; 304 my_lifetime_flag_->is_alive_ = false;
305 } 305 }
306 306
307 void ByteStreamOutputImpl::SetPeer( 307 void ByteStreamReaderImpl::SetPeer(
308 ByteStreamInputImpl* peer, 308 ByteStreamWriterImpl* peer,
309 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 309 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
310 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { 310 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
311 peer_ = peer; 311 peer_ = peer;
312 peer_task_runner_ = peer_task_runner; 312 peer_task_runner_ = peer_task_runner;
313 peer_lifetime_flag_ = peer_lifetime_flag; 313 peer_lifetime_flag_ = peer_lifetime_flag;
314 } 314 }
315 315
316 ByteStreamOutputImpl::StreamState 316 ByteStreamReaderImpl::StreamState
317 ByteStreamOutputImpl::Read(scoped_refptr<net::IOBuffer>* data, 317 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
318 size_t* length) { 318 size_t* length) {
319 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 319 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
320 320
321 if (available_contents_.size()) { 321 if (available_contents_.size()) {
322 *data = available_contents_.front().first; 322 *data = available_contents_.front().first;
323 *length = available_contents_.front().second; 323 *length = available_contents_.front().second;
324 available_contents_.pop_front(); 324 available_contents_.pop_front();
325 unreported_consumed_bytes_ += *length; 325 unreported_consumed_bytes_ += *length;
326 326
327 MaybeUpdateInput(); 327 MaybeUpdateInput();
328 return STREAM_HAS_DATA; 328 return STREAM_HAS_DATA;
329 } 329 }
330 if (received_status_) { 330 if (received_status_) {
331 return STREAM_COMPLETE; 331 return STREAM_COMPLETE;
332 } 332 }
333 return STREAM_EMPTY; 333 return STREAM_EMPTY;
334 } 334 }
335 335
336 content::DownloadInterruptReason 336 content::DownloadInterruptReason
337 ByteStreamOutputImpl::GetStatus() const { 337 ByteStreamReaderImpl::GetStatus() const {
338 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 338 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
339 DCHECK(received_status_); 339 DCHECK(received_status_);
340 return status_; 340 return status_;
341 } 341 }
342 342
343 void ByteStreamOutputImpl::RegisterCallback( 343 void ByteStreamReaderImpl::RegisterCallback(
344 const base::Closure& sink_callback) { 344 const base::Closure& sink_callback) {
345 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 345 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
346 346
347 data_available_callback_ = sink_callback; 347 data_available_callback_ = sink_callback;
348 } 348 }
349 349
350 // static 350 // static
351 void ByteStreamOutputImpl::TransferData( 351 void ByteStreamReaderImpl::TransferData(
352 scoped_refptr<LifetimeFlag> object_lifetime_flag, 352 scoped_refptr<LifetimeFlag> object_lifetime_flag,
353 ByteStreamOutputImpl* target, 353 ByteStreamReaderImpl* target,
354 scoped_ptr<ContentVector> transfer_buffer, 354 scoped_ptr<ContentVector> transfer_buffer,
355 size_t buffer_size, 355 size_t buffer_size,
356 bool source_complete, 356 bool source_complete,
357 content::DownloadInterruptReason status) { 357 content::DownloadInterruptReason status) {
358 // If our target is no longer alive, do nothing. 358 // If our target is no longer alive, do nothing.
359 if (!object_lifetime_flag->is_alive_) return; 359 if (!object_lifetime_flag->is_alive_) return;
360 360
361 target->TransferDataInternal( 361 target->TransferDataInternal(
362 transfer_buffer.Pass(), buffer_size, source_complete, status); 362 transfer_buffer.Pass(), buffer_size, source_complete, status);
363 } 363 }
364 364
365 void ByteStreamOutputImpl::TransferDataInternal( 365 void ByteStreamReaderImpl::TransferDataInternal(
366 scoped_ptr<ContentVector> transfer_buffer, 366 scoped_ptr<ContentVector> transfer_buffer,
367 size_t buffer_size, 367 size_t buffer_size,
368 bool source_complete, 368 bool source_complete,
369 content::DownloadInterruptReason status) { 369 content::DownloadInterruptReason status) {
370 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 370 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
371 371
372 bool was_empty = available_contents_.empty(); 372 bool was_empty = available_contents_.empty();
373 373
374 if (transfer_buffer.get()) { 374 if (transfer_buffer.get()) {
375 available_contents_.insert(available_contents_.end(), 375 available_contents_.insert(available_contents_.end(),
(...skipping 10 matching lines...) Expand all
386 // source complete. 386 // source complete.
387 if (((was_empty && !available_contents_.empty()) || 387 if (((was_empty && !available_contents_.empty()) ||
388 source_complete) && 388 source_complete) &&
389 !data_available_callback_.is_null()) 389 !data_available_callback_.is_null())
390 data_available_callback_.Run(); 390 data_available_callback_.Run();
391 } 391 }
392 392
393 // Decide whether or not to send the input a window update. 393 // Decide whether or not to send the input a window update.
394 // Currently we do that whenever we've got unreported consumption 394 // Currently we do that whenever we've got unreported consumption
395 // greater than 1/3 of total size. 395 // greater than 1/3 of total size.
396 void ByteStreamOutputImpl::MaybeUpdateInput() { 396 void ByteStreamReaderImpl::MaybeUpdateInput() {
397 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 397 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
398 398
399 if (unreported_consumed_bytes_ <= 399 if (unreported_consumed_bytes_ <=
400 total_buffer_size_ / kFractionReadBeforeWindowUpdate) 400 total_buffer_size_ / kFractionReadBeforeWindowUpdate)
401 return; 401 return;
402 402
403 peer_task_runner_->PostTask( 403 peer_task_runner_->PostTask(
404 FROM_HERE, base::Bind( 404 FROM_HERE, base::Bind(
405 &ByteStreamInputImpl::UpdateWindow, 405 &ByteStreamWriterImpl::UpdateWindow,
406 peer_lifetime_flag_, 406 peer_lifetime_flag_,
407 peer_, 407 peer_,
408 unreported_consumed_bytes_)); 408 unreported_consumed_bytes_));
409 unreported_consumed_bytes_ = 0; 409 unreported_consumed_bytes_ = 0;
410 } 410 }
411 411
412 } // namespace 412 } // namespace
413 413
414 namespace content { 414 namespace content {
415 415
416 ByteStreamOutput::~ByteStreamOutput() { } 416 ByteStreamReader::~ByteStreamReader() { }
417 417
418 ByteStreamInput::~ByteStreamInput() { } 418 ByteStreamWriter::~ByteStreamWriter() { }
419 419
420 void CreateByteStream( 420 void CreateByteStream(
421 scoped_refptr<base::SequencedTaskRunner> input_task_runner, 421 scoped_refptr<base::SequencedTaskRunner> input_task_runner,
422 scoped_refptr<base::SequencedTaskRunner> output_task_runner, 422 scoped_refptr<base::SequencedTaskRunner> output_task_runner,
423 size_t buffer_size, 423 size_t buffer_size,
424 scoped_ptr<ByteStreamInput>* input, 424 scoped_ptr<ByteStreamWriter>* input,
425 scoped_ptr<ByteStreamOutput>* output) { 425 scoped_ptr<ByteStreamReader>* output) {
426 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); 426 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
427 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); 427 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
428 428
429 ByteStreamInputImpl* in = new ByteStreamInputImpl( 429 ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
430 input_task_runner, input_flag, buffer_size); 430 input_task_runner, input_flag, buffer_size);
431 ByteStreamOutputImpl* out = new ByteStreamOutputImpl( 431 ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
432 output_task_runner, output_flag, buffer_size); 432 output_task_runner, output_flag, buffer_size);
433 433
434 in->SetPeer(out, output_task_runner, output_flag); 434 in->SetPeer(out, output_task_runner, output_flag);
435 out->SetPeer(in, input_task_runner, input_flag); 435 out->SetPeer(in, input_task_runner, input_flag);
436 input->reset(in); 436 input->reset(in);
437 output->reset(out); 437 output->reset(out);
438 } 438 }
439 439
440 } // namespace content 440 } // namespace content
OLDNEW
« no previous file with comments | « content/browser/download/byte_stream.h ('k') | content/browser/download/byte_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698