| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/browser/byte_stream.h" | 5 #include "content/browser/byte_stream.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/location.h" | 8 #include "base/location.h" |
| 9 #include "base/memory/ref_counted.h" | 9 #include "base/memory/ref_counted.h" |
| 10 #include "base/memory/weak_ptr.h" | 10 #include "base/memory/weak_ptr.h" |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 48 virtual ~ByteStreamWriterImpl(); | 48 virtual ~ByteStreamWriterImpl(); |
| 49 | 49 |
| 50 // Must be called before any operations are performed. | 50 // Must be called before any operations are performed. |
| 51 void SetPeer(ByteStreamReaderImpl* peer, | 51 void SetPeer(ByteStreamReaderImpl* peer, |
| 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
| 54 | 54 |
| 55 // Overridden from ByteStreamWriter. | 55 // Overridden from ByteStreamWriter. |
| 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
| 57 size_t byte_count) OVERRIDE; | 57 size_t byte_count) OVERRIDE; |
| 58 virtual void Close(DownloadInterruptReason status) OVERRIDE; | 58 virtual void Close(int status) OVERRIDE; |
| 59 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | 59 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; |
| 60 | 60 |
| 61 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | 61 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. |
| 62 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | 62 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, |
| 63 ByteStreamWriterImpl* target, | 63 ByteStreamWriterImpl* target, |
| 64 size_t bytes_consumed); | 64 size_t bytes_consumed); |
| 65 | 65 |
| 66 private: | 66 private: |
| 67 // Called from UpdateWindow when object existence has been validated. | 67 // Called from UpdateWindow when object existence has been validated. |
| 68 void UpdateWindowInternal(size_t bytes_consumed); | 68 void UpdateWindowInternal(size_t bytes_consumed); |
| 69 | 69 |
| 70 void PostToPeer(bool complete, DownloadInterruptReason status); | 70 void PostToPeer(bool complete, int status); |
| 71 | 71 |
| 72 const size_t total_buffer_size_; | 72 const size_t total_buffer_size_; |
| 73 | 73 |
| 74 // All data objects in this class are only valid to access on | 74 // All data objects in this class are only valid to access on |
| 75 // this task runner except as otherwise noted. | 75 // this task runner except as otherwise noted. |
| 76 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | 76 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| 77 | 77 |
| 78 // True while this object is alive. | 78 // True while this object is alive. |
| 79 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | 79 scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
| 80 | 80 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 106 virtual ~ByteStreamReaderImpl(); | 106 virtual ~ByteStreamReaderImpl(); |
| 107 | 107 |
| 108 // Must be called before any operations are performed. | 108 // Must be called before any operations are performed. |
| 109 void SetPeer(ByteStreamWriterImpl* peer, | 109 void SetPeer(ByteStreamWriterImpl* peer, |
| 110 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 110 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| 111 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | 111 scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
| 112 | 112 |
| 113 // Overridden from ByteStreamReader. | 113 // Overridden from ByteStreamReader. |
| 114 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | 114 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, |
| 115 size_t* length) OVERRIDE; | 115 size_t* length) OVERRIDE; |
| 116 virtual DownloadInterruptReason GetStatus() const OVERRIDE; | 116 virtual int GetStatus() const OVERRIDE; |
| 117 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | 117 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; |
| 118 | 118 |
| 119 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and | 119 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and |
| 120 // |ByteStreamWriterImpl::Close|. | 120 // |ByteStreamWriterImpl::Close|. |
| 121 // Receive data from our peer. | 121 // Receive data from our peer. |
| 122 // static because it may be called after the object it is targeting | 122 // static because it may be called after the object it is targeting |
| 123 // has been destroyed. It may not access |*target| | 123 // has been destroyed. It may not access |*target| |
| 124 // if |*object_lifetime_flag| is false. | 124 // if |*object_lifetime_flag| is false. |
| 125 static void TransferData( | 125 static void TransferData( |
| 126 scoped_refptr<LifetimeFlag> object_lifetime_flag, | 126 scoped_refptr<LifetimeFlag> object_lifetime_flag, |
| 127 ByteStreamReaderImpl* target, | 127 ByteStreamReaderImpl* target, |
| 128 scoped_ptr<ContentVector> transfer_buffer, | 128 scoped_ptr<ContentVector> transfer_buffer, |
| 129 size_t transfer_buffer_bytes, | 129 size_t transfer_buffer_bytes, |
| 130 bool source_complete, | 130 bool source_complete, |
| 131 DownloadInterruptReason status); | 131 int status); |
| 132 | 132 |
| 133 private: | 133 private: |
| 134 // Called from TransferData once object existence has been validated. | 134 // Called from TransferData once object existence has been validated. |
| 135 void TransferDataInternal( | 135 void TransferDataInternal( |
| 136 scoped_ptr<ContentVector> transfer_buffer, | 136 scoped_ptr<ContentVector> transfer_buffer, |
| 137 size_t transfer_buffer_bytes, | 137 size_t transfer_buffer_bytes, |
| 138 bool source_complete, | 138 bool source_complete, |
| 139 DownloadInterruptReason status); | 139 int status); |
| 140 | 140 |
| 141 void MaybeUpdateInput(); | 141 void MaybeUpdateInput(); |
| 142 | 142 |
| 143 const size_t total_buffer_size_; | 143 const size_t total_buffer_size_; |
| 144 | 144 |
| 145 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | 145 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| 146 | 146 |
| 147 // True while this object is alive. | 147 // True while this object is alive. |
| 148 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | 148 scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
| 149 | 149 |
| 150 ContentVector available_contents_; | 150 ContentVector available_contents_; |
| 151 | 151 |
| 152 bool received_status_; | 152 bool received_status_; |
| 153 DownloadInterruptReason status_; | 153 int status_; |
| 154 | 154 |
| 155 base::Closure data_available_callback_; | 155 base::Closure data_available_callback_; |
| 156 | 156 |
| 157 // Time of last point at which data in stream transitioned from full | 157 // Time of last point at which data in stream transitioned from full |
| 158 // to non-full. Nulled when a callback is sent. | 158 // to non-full. Nulled when a callback is sent. |
| 159 base::Time last_non_full_time_; | 159 base::Time last_non_full_time_; |
| 160 | 160 |
| 161 // ** Peer information | 161 // ** Peer information |
| 162 | 162 |
| 163 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | 163 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 203 | 203 |
| 204 bool ByteStreamWriterImpl::Write( | 204 bool ByteStreamWriterImpl::Write( |
| 205 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | 205 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
| 206 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 206 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 207 | 207 |
| 208 input_contents_.push_back(std::make_pair(buffer, byte_count)); | 208 input_contents_.push_back(std::make_pair(buffer, byte_count)); |
| 209 input_contents_size_ += byte_count; | 209 input_contents_size_ += byte_count; |
| 210 | 210 |
| 211 // Arbitrarily, we buffer to a third of the total size before sending. | 211 // Arbitrarily, we buffer to a third of the total size before sending. |
| 212 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | 212 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) |
| 213 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); | 213 PostToPeer(false, 0 /* status */); |
| 214 | 214 |
| 215 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | 215 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); |
| 216 } | 216 } |
| 217 | 217 |
| 218 void ByteStreamWriterImpl::Close( | 218 void ByteStreamWriterImpl::Close(int status) { |
| 219 DownloadInterruptReason status) { | |
| 220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 219 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 221 PostToPeer(true, status); | 220 PostToPeer(true, status); |
| 222 } | 221 } |
| 223 | 222 |
| 224 void ByteStreamWriterImpl::RegisterCallback( | 223 void ByteStreamWriterImpl::RegisterCallback( |
| 225 const base::Closure& source_callback) { | 224 const base::Closure& source_callback) { |
| 226 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 225 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 227 space_available_callback_ = source_callback; | 226 space_available_callback_ = source_callback; |
| 228 } | 227 } |
| 229 | 228 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 245 // Callback if we were above the limit and we're now <= to it. | 244 // Callback if we were above the limit and we're now <= to it. |
| 246 size_t total_known_size_used = | 245 size_t total_known_size_used = |
| 247 input_contents_size_ + output_size_used_; | 246 input_contents_size_ + output_size_used_; |
| 248 | 247 |
| 249 if (total_known_size_used <= total_buffer_size_ && | 248 if (total_known_size_used <= total_buffer_size_ && |
| 250 (total_known_size_used + bytes_consumed > total_buffer_size_) && | 249 (total_known_size_used + bytes_consumed > total_buffer_size_) && |
| 251 !space_available_callback_.is_null()) | 250 !space_available_callback_.is_null()) |
| 252 space_available_callback_.Run(); | 251 space_available_callback_.Run(); |
| 253 } | 252 } |
| 254 | 253 |
| 255 void ByteStreamWriterImpl::PostToPeer( | 254 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { |
| 256 bool complete, DownloadInterruptReason status) { | |
| 257 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 255 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 258 // Valid contexts in which to call. | 256 // Valid contexts in which to call. |
| 259 DCHECK(complete || 0 != input_contents_size_); | 257 DCHECK(complete || 0 != input_contents_size_); |
| 260 | 258 |
| 261 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); | 259 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); |
| 262 size_t buffer_size = 0; | 260 size_t buffer_size = 0; |
| 263 if (0 != input_contents_size_) { | 261 if (0 != input_contents_size_) { |
| 264 transfer_buffer.reset(new ContentVector); | 262 transfer_buffer.reset(new ContentVector); |
| 265 transfer_buffer->swap(input_contents_); | 263 transfer_buffer->swap(input_contents_); |
| 266 buffer_size = input_contents_size_; | 264 buffer_size = input_contents_size_; |
| (...skipping 12 matching lines...) Expand all Loading... |
| 279 } | 277 } |
| 280 | 278 |
| 281 ByteStreamReaderImpl::ByteStreamReaderImpl( | 279 ByteStreamReaderImpl::ByteStreamReaderImpl( |
| 282 scoped_refptr<base::SequencedTaskRunner> task_runner, | 280 scoped_refptr<base::SequencedTaskRunner> task_runner, |
| 283 scoped_refptr<LifetimeFlag> lifetime_flag, | 281 scoped_refptr<LifetimeFlag> lifetime_flag, |
| 284 size_t buffer_size) | 282 size_t buffer_size) |
| 285 : total_buffer_size_(buffer_size), | 283 : total_buffer_size_(buffer_size), |
| 286 my_task_runner_(task_runner), | 284 my_task_runner_(task_runner), |
| 287 my_lifetime_flag_(lifetime_flag), | 285 my_lifetime_flag_(lifetime_flag), |
| 288 received_status_(false), | 286 received_status_(false), |
| 289 status_(DOWNLOAD_INTERRUPT_REASON_NONE), | 287 status_(0), |
| 290 unreported_consumed_bytes_(0), | 288 unreported_consumed_bytes_(0), |
| 291 peer_(NULL) { | 289 peer_(NULL) { |
| 292 DCHECK(my_lifetime_flag_.get()); | 290 DCHECK(my_lifetime_flag_.get()); |
| 293 my_lifetime_flag_->is_alive = true; | 291 my_lifetime_flag_->is_alive = true; |
| 294 } | 292 } |
| 295 | 293 |
| 296 ByteStreamReaderImpl::~ByteStreamReaderImpl() { | 294 ByteStreamReaderImpl::~ByteStreamReaderImpl() { |
| 297 my_lifetime_flag_->is_alive = false; | 295 my_lifetime_flag_->is_alive = false; |
| 298 } | 296 } |
| 299 | 297 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 319 | 317 |
| 320 MaybeUpdateInput(); | 318 MaybeUpdateInput(); |
| 321 return STREAM_HAS_DATA; | 319 return STREAM_HAS_DATA; |
| 322 } | 320 } |
| 323 if (received_status_) { | 321 if (received_status_) { |
| 324 return STREAM_COMPLETE; | 322 return STREAM_COMPLETE; |
| 325 } | 323 } |
| 326 return STREAM_EMPTY; | 324 return STREAM_EMPTY; |
| 327 } | 325 } |
| 328 | 326 |
| 329 DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const { | 327 int ByteStreamReaderImpl::GetStatus() const { |
| 330 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 328 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 331 DCHECK(received_status_); | 329 DCHECK(received_status_); |
| 332 return status_; | 330 return status_; |
| 333 } | 331 } |
| 334 | 332 |
| 335 void ByteStreamReaderImpl::RegisterCallback( | 333 void ByteStreamReaderImpl::RegisterCallback( |
| 336 const base::Closure& sink_callback) { | 334 const base::Closure& sink_callback) { |
| 337 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 335 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 338 | 336 |
| 339 data_available_callback_ = sink_callback; | 337 data_available_callback_ = sink_callback; |
| 340 } | 338 } |
| 341 | 339 |
| 342 // static | 340 // static |
| 343 void ByteStreamReaderImpl::TransferData( | 341 void ByteStreamReaderImpl::TransferData( |
| 344 scoped_refptr<LifetimeFlag> object_lifetime_flag, | 342 scoped_refptr<LifetimeFlag> object_lifetime_flag, |
| 345 ByteStreamReaderImpl* target, | 343 ByteStreamReaderImpl* target, |
| 346 scoped_ptr<ContentVector> transfer_buffer, | 344 scoped_ptr<ContentVector> transfer_buffer, |
| 347 size_t buffer_size, | 345 size_t buffer_size, |
| 348 bool source_complete, | 346 bool source_complete, |
| 349 DownloadInterruptReason status) { | 347 int status) { |
| 350 // If our target is no longer alive, do nothing. | 348 // If our target is no longer alive, do nothing. |
| 351 if (!object_lifetime_flag->is_alive) return; | 349 if (!object_lifetime_flag->is_alive) return; |
| 352 | 350 |
| 353 target->TransferDataInternal( | 351 target->TransferDataInternal( |
| 354 transfer_buffer.Pass(), buffer_size, source_complete, status); | 352 transfer_buffer.Pass(), buffer_size, source_complete, status); |
| 355 } | 353 } |
| 356 | 354 |
| 357 void ByteStreamReaderImpl::TransferDataInternal( | 355 void ByteStreamReaderImpl::TransferDataInternal( |
| 358 scoped_ptr<ContentVector> transfer_buffer, | 356 scoped_ptr<ContentVector> transfer_buffer, |
| 359 size_t buffer_size, | 357 size_t buffer_size, |
| 360 bool source_complete, | 358 bool source_complete, |
| 361 DownloadInterruptReason status) { | 359 int status) { |
| 362 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 360 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 363 | 361 |
| 364 bool was_empty = available_contents_.empty(); | 362 bool was_empty = available_contents_.empty(); |
| 365 | 363 |
| 366 if (transfer_buffer) { | 364 if (transfer_buffer) { |
| 367 available_contents_.insert(available_contents_.end(), | 365 available_contents_.insert(available_contents_.end(), |
| 368 transfer_buffer->begin(), | 366 transfer_buffer->begin(), |
| 369 transfer_buffer->end()); | 367 transfer_buffer->end()); |
| 370 } | 368 } |
| 371 | 369 |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 425 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( | 423 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( |
| 426 output_task_runner, output_flag, buffer_size); | 424 output_task_runner, output_flag, buffer_size); |
| 427 | 425 |
| 428 in->SetPeer(out, output_task_runner, output_flag); | 426 in->SetPeer(out, output_task_runner, output_flag); |
| 429 out->SetPeer(in, input_task_runner, input_flag); | 427 out->SetPeer(in, input_task_runner, input_flag); |
| 430 input->reset(in); | 428 input->reset(in); |
| 431 output->reset(out); | 429 output->reset(out); |
| 432 } | 430 } |
| 433 | 431 |
| 434 } // namespace content | 432 } // namespace content |
| OLD | NEW |