Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(952)

Side by Side Diff: content/browser/byte_stream.cc

Issue 18284005: Make ByteStream independent from DownloadInterruptReason (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: reupload with the change made for rdsmith's comment Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/byte_stream.h" 5 #include "content/browser/byte_stream.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/memory/ref_counted.h" 9 #include "base/memory/ref_counted.h"
10 #include "base/memory/weak_ptr.h" 10 #include "base/memory/weak_ptr.h"
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
48 virtual ~ByteStreamWriterImpl(); 48 virtual ~ByteStreamWriterImpl();
49 49
50 // Must be called before any operations are performed. 50 // Must be called before any operations are performed.
51 void SetPeer(ByteStreamReaderImpl* peer, 51 void SetPeer(ByteStreamReaderImpl* peer,
52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
54 54
55 // Overridden from ByteStreamWriter. 55 // Overridden from ByteStreamWriter.
56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
57 size_t byte_count) OVERRIDE; 57 size_t byte_count) OVERRIDE;
58 virtual void Close(DownloadInterruptReason status) OVERRIDE; 58 virtual void Close(int status) OVERRIDE;
59 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; 59 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
60 60
61 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. 61 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
62 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, 62 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
63 ByteStreamWriterImpl* target, 63 ByteStreamWriterImpl* target,
64 size_t bytes_consumed); 64 size_t bytes_consumed);
65 65
66 private: 66 private:
67 // Called from UpdateWindow when object existence has been validated. 67 // Called from UpdateWindow when object existence has been validated.
68 void UpdateWindowInternal(size_t bytes_consumed); 68 void UpdateWindowInternal(size_t bytes_consumed);
69 69
70 void PostToPeer(bool complete, DownloadInterruptReason status); 70 void PostToPeer(bool complete, int status);
71 71
72 const size_t total_buffer_size_; 72 const size_t total_buffer_size_;
73 73
74 // All data objects in this class are only valid to access on 74 // All data objects in this class are only valid to access on
75 // this task runner except as otherwise noted. 75 // this task runner except as otherwise noted.
76 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 76 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
77 77
78 // True while this object is alive. 78 // True while this object is alive.
79 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 79 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
80 80
(...skipping 25 matching lines...) Expand all
106 virtual ~ByteStreamReaderImpl(); 106 virtual ~ByteStreamReaderImpl();
107 107
108 // Must be called before any operations are performed. 108 // Must be called before any operations are performed.
109 void SetPeer(ByteStreamWriterImpl* peer, 109 void SetPeer(ByteStreamWriterImpl* peer,
110 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 110 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
111 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 111 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
112 112
113 // Overridden from ByteStreamReader. 113 // Overridden from ByteStreamReader.
114 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, 114 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
115 size_t* length) OVERRIDE; 115 size_t* length) OVERRIDE;
116 virtual DownloadInterruptReason GetStatus() const OVERRIDE; 116 virtual int GetStatus() const OVERRIDE;
117 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; 117 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
118 118
119 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and 119 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and
120 // |ByteStreamWriterImpl::Close|. 120 // |ByteStreamWriterImpl::Close|.
121 // Receive data from our peer. 121 // Receive data from our peer.
122 // static because it may be called after the object it is targeting 122 // static because it may be called after the object it is targeting
123 // has been destroyed. It may not access |*target| 123 // has been destroyed. It may not access |*target|
124 // if |*object_lifetime_flag| is false. 124 // if |*object_lifetime_flag| is false.
125 static void TransferData( 125 static void TransferData(
126 scoped_refptr<LifetimeFlag> object_lifetime_flag, 126 scoped_refptr<LifetimeFlag> object_lifetime_flag,
127 ByteStreamReaderImpl* target, 127 ByteStreamReaderImpl* target,
128 scoped_ptr<ContentVector> transfer_buffer, 128 scoped_ptr<ContentVector> transfer_buffer,
129 size_t transfer_buffer_bytes, 129 size_t transfer_buffer_bytes,
130 bool source_complete, 130 bool source_complete,
131 DownloadInterruptReason status); 131 int status);
132 132
133 private: 133 private:
134 // Called from TransferData once object existence has been validated. 134 // Called from TransferData once object existence has been validated.
135 void TransferDataInternal( 135 void TransferDataInternal(
136 scoped_ptr<ContentVector> transfer_buffer, 136 scoped_ptr<ContentVector> transfer_buffer,
137 size_t transfer_buffer_bytes, 137 size_t transfer_buffer_bytes,
138 bool source_complete, 138 bool source_complete,
139 DownloadInterruptReason status); 139 int status);
140 140
141 void MaybeUpdateInput(); 141 void MaybeUpdateInput();
142 142
143 const size_t total_buffer_size_; 143 const size_t total_buffer_size_;
144 144
145 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 145 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
146 146
147 // True while this object is alive. 147 // True while this object is alive.
148 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 148 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
149 149
150 ContentVector available_contents_; 150 ContentVector available_contents_;
151 151
152 bool received_status_; 152 bool received_status_;
153 DownloadInterruptReason status_; 153 int status_;
154 154
155 base::Closure data_available_callback_; 155 base::Closure data_available_callback_;
156 156
157 // Time of last point at which data in stream transitioned from full 157 // Time of last point at which data in stream transitioned from full
158 // to non-full. Nulled when a callback is sent. 158 // to non-full. Nulled when a callback is sent.
159 base::Time last_non_full_time_; 159 base::Time last_non_full_time_;
160 160
161 // ** Peer information 161 // ** Peer information
162 162
163 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 163 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
203 203
204 bool ByteStreamWriterImpl::Write( 204 bool ByteStreamWriterImpl::Write(
205 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { 205 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
206 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 206 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
207 207
208 input_contents_.push_back(std::make_pair(buffer, byte_count)); 208 input_contents_.push_back(std::make_pair(buffer, byte_count));
209 input_contents_size_ += byte_count; 209 input_contents_size_ += byte_count;
210 210
211 // Arbitrarily, we buffer to a third of the total size before sending. 211 // Arbitrarily, we buffer to a third of the total size before sending.
212 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) 212 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
213 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); 213 PostToPeer(false, 0 /* status */);
214 214
215 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); 215 return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
216 } 216 }
217 217
218 void ByteStreamWriterImpl::Close( 218 void ByteStreamWriterImpl::Close(int status) {
219 DownloadInterruptReason status) {
220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 219 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
221 PostToPeer(true, status); 220 PostToPeer(true, status);
222 } 221 }
223 222
224 void ByteStreamWriterImpl::RegisterCallback( 223 void ByteStreamWriterImpl::RegisterCallback(
225 const base::Closure& source_callback) { 224 const base::Closure& source_callback) {
226 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 225 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
227 space_available_callback_ = source_callback; 226 space_available_callback_ = source_callback;
228 } 227 }
229 228
(...skipping 15 matching lines...) Expand all
245 // Callback if we were above the limit and we're now <= to it. 244 // Callback if we were above the limit and we're now <= to it.
246 size_t total_known_size_used = 245 size_t total_known_size_used =
247 input_contents_size_ + output_size_used_; 246 input_contents_size_ + output_size_used_;
248 247
249 if (total_known_size_used <= total_buffer_size_ && 248 if (total_known_size_used <= total_buffer_size_ &&
250 (total_known_size_used + bytes_consumed > total_buffer_size_) && 249 (total_known_size_used + bytes_consumed > total_buffer_size_) &&
251 !space_available_callback_.is_null()) 250 !space_available_callback_.is_null())
252 space_available_callback_.Run(); 251 space_available_callback_.Run();
253 } 252 }
254 253
255 void ByteStreamWriterImpl::PostToPeer( 254 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
256 bool complete, DownloadInterruptReason status) {
257 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 255 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
258 // Valid contexts in which to call. 256 // Valid contexts in which to call.
259 DCHECK(complete || 0 != input_contents_size_); 257 DCHECK(complete || 0 != input_contents_size_);
260 258
261 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); 259 scoped_ptr<ContentVector> transfer_buffer(new ContentVector);
262 size_t buffer_size = 0; 260 size_t buffer_size = 0;
263 if (0 != input_contents_size_) { 261 if (0 != input_contents_size_) {
264 transfer_buffer.reset(new ContentVector); 262 transfer_buffer.reset(new ContentVector);
265 transfer_buffer->swap(input_contents_); 263 transfer_buffer->swap(input_contents_);
266 buffer_size = input_contents_size_; 264 buffer_size = input_contents_size_;
(...skipping 12 matching lines...) Expand all
279 } 277 }
280 278
281 ByteStreamReaderImpl::ByteStreamReaderImpl( 279 ByteStreamReaderImpl::ByteStreamReaderImpl(
282 scoped_refptr<base::SequencedTaskRunner> task_runner, 280 scoped_refptr<base::SequencedTaskRunner> task_runner,
283 scoped_refptr<LifetimeFlag> lifetime_flag, 281 scoped_refptr<LifetimeFlag> lifetime_flag,
284 size_t buffer_size) 282 size_t buffer_size)
285 : total_buffer_size_(buffer_size), 283 : total_buffer_size_(buffer_size),
286 my_task_runner_(task_runner), 284 my_task_runner_(task_runner),
287 my_lifetime_flag_(lifetime_flag), 285 my_lifetime_flag_(lifetime_flag),
288 received_status_(false), 286 received_status_(false),
289 status_(DOWNLOAD_INTERRUPT_REASON_NONE), 287 status_(0),
290 unreported_consumed_bytes_(0), 288 unreported_consumed_bytes_(0),
291 peer_(NULL) { 289 peer_(NULL) {
292 DCHECK(my_lifetime_flag_.get()); 290 DCHECK(my_lifetime_flag_.get());
293 my_lifetime_flag_->is_alive = true; 291 my_lifetime_flag_->is_alive = true;
294 } 292 }
295 293
296 ByteStreamReaderImpl::~ByteStreamReaderImpl() { 294 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
297 my_lifetime_flag_->is_alive = false; 295 my_lifetime_flag_->is_alive = false;
298 } 296 }
299 297
(...skipping 19 matching lines...) Expand all
319 317
320 MaybeUpdateInput(); 318 MaybeUpdateInput();
321 return STREAM_HAS_DATA; 319 return STREAM_HAS_DATA;
322 } 320 }
323 if (received_status_) { 321 if (received_status_) {
324 return STREAM_COMPLETE; 322 return STREAM_COMPLETE;
325 } 323 }
326 return STREAM_EMPTY; 324 return STREAM_EMPTY;
327 } 325 }
328 326
329 DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const { 327 int ByteStreamReaderImpl::GetStatus() const {
330 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 328 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
331 DCHECK(received_status_); 329 DCHECK(received_status_);
332 return status_; 330 return status_;
333 } 331 }
334 332
335 void ByteStreamReaderImpl::RegisterCallback( 333 void ByteStreamReaderImpl::RegisterCallback(
336 const base::Closure& sink_callback) { 334 const base::Closure& sink_callback) {
337 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 335 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
338 336
339 data_available_callback_ = sink_callback; 337 data_available_callback_ = sink_callback;
340 } 338 }
341 339
342 // static 340 // static
343 void ByteStreamReaderImpl::TransferData( 341 void ByteStreamReaderImpl::TransferData(
344 scoped_refptr<LifetimeFlag> object_lifetime_flag, 342 scoped_refptr<LifetimeFlag> object_lifetime_flag,
345 ByteStreamReaderImpl* target, 343 ByteStreamReaderImpl* target,
346 scoped_ptr<ContentVector> transfer_buffer, 344 scoped_ptr<ContentVector> transfer_buffer,
347 size_t buffer_size, 345 size_t buffer_size,
348 bool source_complete, 346 bool source_complete,
349 DownloadInterruptReason status) { 347 int status) {
350 // If our target is no longer alive, do nothing. 348 // If our target is no longer alive, do nothing.
351 if (!object_lifetime_flag->is_alive) return; 349 if (!object_lifetime_flag->is_alive) return;
352 350
353 target->TransferDataInternal( 351 target->TransferDataInternal(
354 transfer_buffer.Pass(), buffer_size, source_complete, status); 352 transfer_buffer.Pass(), buffer_size, source_complete, status);
355 } 353 }
356 354
357 void ByteStreamReaderImpl::TransferDataInternal( 355 void ByteStreamReaderImpl::TransferDataInternal(
358 scoped_ptr<ContentVector> transfer_buffer, 356 scoped_ptr<ContentVector> transfer_buffer,
359 size_t buffer_size, 357 size_t buffer_size,
360 bool source_complete, 358 bool source_complete,
361 DownloadInterruptReason status) { 359 int status) {
362 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 360 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
363 361
364 bool was_empty = available_contents_.empty(); 362 bool was_empty = available_contents_.empty();
365 363
366 if (transfer_buffer) { 364 if (transfer_buffer) {
367 available_contents_.insert(available_contents_.end(), 365 available_contents_.insert(available_contents_.end(),
368 transfer_buffer->begin(), 366 transfer_buffer->begin(),
369 transfer_buffer->end()); 367 transfer_buffer->end());
370 } 368 }
371 369
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
425 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( 423 ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
426 output_task_runner, output_flag, buffer_size); 424 output_task_runner, output_flag, buffer_size);
427 425
428 in->SetPeer(out, output_task_runner, output_flag); 426 in->SetPeer(out, output_task_runner, output_flag);
429 out->SetPeer(in, input_task_runner, input_flag); 427 out->SetPeer(in, input_task_runner, input_flag);
430 input->reset(in); 428 input->reset(in);
431 output->reset(out); 429 output->reset(out);
432 } 430 }
433 431
434 } // namespace content 432 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698