Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "content/browser/download/byte_stream.h" | |
| 6 | |
| 7 #include "base/bind.h" | |
| 8 #include "base/location.h" | |
| 9 #include "base/memory/weak_ptr.h" | |
| 10 #include "base/memory/ref_counted.h" | |
| 11 #include "base/sequenced_task_runner.h" | |
| 12 | |
| 13 namespace { | |
| 14 | |
| 15 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>,size_t> > | |
|
benjhayden
2012/05/16 14:26:25
Space after comma.
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 16 ContentVector; | |
| 17 | |
| 18 class ByteStreamOutputImpl; | |
| 19 | |
| 20 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be | |
| 21 // cleared in an object destructor and accessed to check for object | |
| 22 // existence. We can't use weak pointers because they're tightly tied to | |
| 23 // threads rather than task runners. | |
|
benjhayden
2012/05/16 14:26:25
TODO: TaskRunnerWeakPointer?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 24 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { | |
| 25 public: | |
| 26 LifetimeFlag() : is_alive_(true) { } | |
| 27 bool is_alive_; | |
|
benjhayden
2012/05/16 14:26:25
blank line between sections?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 28 protected: | |
| 29 friend class base::RefCountedThreadSafe<LifetimeFlag>; | |
| 30 virtual ~LifetimeFlag() { } | |
| 31 }; | |
|
benjhayden
2012/05/16 14:26:25
DISALLOW_COPY_AND_ASSIGN?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 32 | |
| 33 // For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and | |
| 34 // SetPeer may happen anywhere; all other operations on each class must | |
| 35 // happen in the context of their SequencedTaskRunner. | |
| 36 class ByteStreamInputImpl : public content::ByteStreamInput { | |
| 37 public: | |
| 38 ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 39 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 40 size_t buffer_size); | |
| 41 virtual ~ByteStreamInputImpl(); | |
| 42 | |
| 43 // Must be called before any operations are performed. | |
| 44 void SetPeer(ByteStreamOutputImpl* peer, | |
| 45 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 46 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
| 47 | |
| 48 // Overridden from ByteStreamInput. | |
| 49 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | |
| 50 size_t byte_count) OVERRIDE; | |
| 51 virtual void Close(content::DownloadInterruptReason status) OVERRIDE; | |
| 52 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | |
| 53 | |
| 54 // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|. | |
| 55 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 56 ByteStreamInputImpl* target, | |
| 57 size_t bytes_consumed); | |
| 58 | |
| 59 private: | |
| 60 void MaybePostToPeer(); | |
| 61 | |
| 62 const size_t total_buffer_size_; | |
| 63 | |
| 64 // All data objects in this class are only valid to access on | |
| 65 // this task runner except as otherwise noted. | |
| 66 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
| 67 | |
| 68 // True while this object is alive. | |
| 69 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
| 70 | |
| 71 base::Closure space_available_callback_; | |
| 72 ContentVector input_contents_; | |
| 73 size_t input_contents_size_; | |
| 74 | |
| 75 // ** Peer information. | |
| 76 | |
| 77 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
| 78 | |
| 79 // How much we've sent to the output that for flow control purposes we | |
| 80 // must assume hasn't been read yet. | |
| 81 size_t output_size_used_; | |
| 82 | |
| 83 // Only valid to access on peer_task_runner_. | |
| 84 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
| 85 | |
| 86 // Only valid to access on peer_task_runner_ if | |
| 87 // |*peer_lifetime_flag_ == true| | |
| 88 ByteStreamOutputImpl* peer_; | |
| 89 }; | |
| 90 | |
| 91 class ByteStreamOutputImpl : public content::ByteStreamOutput { | |
| 92 public: | |
| 93 ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 94 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 95 size_t buffer_size); | |
| 96 virtual ~ByteStreamOutputImpl(); | |
| 97 | |
| 98 // Must be called before any operations are performed. | |
| 99 void SetPeer(ByteStreamInputImpl* peer, | |
| 100 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 101 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
| 102 | |
| 103 // Overridden from ByteStreamOutput. | |
| 104 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | |
| 105 size_t* length) OVERRIDE; | |
| 106 virtual content::DownloadInterruptReason GetStatus() const OVERRIDE; | |
| 107 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | |
| 108 | |
| 109 // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and | |
| 110 // |ByteStreamInputImpl::Close|. | |
| 111 // Receive data from our peer. | |
| 112 // static because it may be called after the object it is targeting | |
| 113 // has been destroyed. It may not access |*target| | |
| 114 // if |*object_lifetime_flag| is false. | |
| 115 static void TransferData( | |
| 116 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
| 117 ByteStreamOutputImpl* target, | |
| 118 scoped_ptr<ContentVector> xfer_buffer, | |
| 119 size_t xfer_buffer_bytes, | |
| 120 bool source_complete, | |
| 121 content::DownloadInterruptReason status); | |
| 122 | |
| 123 private: | |
| 124 void MaybeUpdateInput(); | |
| 125 | |
| 126 const size_t total_buffer_size_; | |
| 127 | |
| 128 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
| 129 | |
| 130 // True while this object is alive. | |
| 131 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
| 132 | |
| 133 ContentVector available_contents_; | |
| 134 size_t available_contents_size_; | |
| 135 | |
| 136 bool received_status_; | |
| 137 content::DownloadInterruptReason status_; | |
| 138 | |
| 139 base::Closure data_available_callback_; | |
| 140 | |
| 141 // Time of last point at which data in stream transitioned from full | |
| 142 // to non-full. Nulled when a callback is sent. | |
| 143 base::Time last_non_full_time_; | |
| 144 | |
| 145 // ** Peer information | |
| 146 | |
| 147 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
| 148 | |
| 149 // How much has been removed from this class that we haven't told | |
| 150 // the input about yet. | |
| 151 size_t unreported_consumed_bytes_; | |
| 152 | |
| 153 // Only valid to access on peer_task_runner_. | |
| 154 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
| 155 | |
| 156 // Only valid to access on peer_task_runner_ if | |
| 157 // |*peer_lifetime_flag_ == true| | |
| 158 ByteStreamInputImpl* peer_; | |
| 159 }; | |
| 160 | |
| 161 ByteStreamInputImpl::ByteStreamInputImpl( | |
| 162 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 163 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 164 size_t buffer_size) | |
| 165 : total_buffer_size_(buffer_size), | |
| 166 my_task_runner_(task_runner), | |
| 167 my_lifetime_flag_(lifetime_flag), | |
| 168 input_contents_size_(0), | |
| 169 output_size_used_(0), | |
| 170 peer_(NULL) { | |
| 171 DCHECK(my_lifetime_flag_.get()); | |
| 172 my_lifetime_flag_->is_alive_ = true; | |
| 173 } | |
| 174 | |
| 175 ByteStreamInputImpl::~ByteStreamInputImpl() { | |
| 176 my_lifetime_flag_->is_alive_ = false; | |
| 177 } | |
| 178 | |
| 179 void ByteStreamInputImpl::SetPeer( | |
| 180 ByteStreamOutputImpl* peer, | |
| 181 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 182 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
| 183 peer_ = peer; | |
| 184 peer_task_runner_ = peer_task_runner; | |
| 185 peer_lifetime_flag_ = peer_lifetime_flag; | |
| 186 } | |
| 187 | |
| 188 bool ByteStreamInputImpl::Write( | |
| 189 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | |
| 190 input_contents_.push_back(std::make_pair(buffer, byte_count)); | |
| 191 input_contents_size_ += byte_count; | |
| 192 | |
| 193 MaybePostToPeer(); | |
| 194 | |
| 195 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | |
| 196 } | |
| 197 | |
| 198 void ByteStreamInputImpl::Close( | |
|
benjhayden
2012/05/16 19:17:21
This seems to duplicate some of MaybePostToPeer().
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 199 content::DownloadInterruptReason status) { | |
| 200 // Make sure to flush everything we have with the source notification. | |
| 201 scoped_ptr<ContentVector> xfer_buffer; | |
| 202 size_t buffer_size = 0; | |
| 203 if (0 != input_contents_size_) { | |
| 204 xfer_buffer.reset(new ContentVector); | |
| 205 xfer_buffer->swap(input_contents_); | |
| 206 buffer_size = input_contents_size_; | |
| 207 output_size_used_ += input_contents_size_; | |
| 208 input_contents_size_ = 0; | |
| 209 } | |
| 210 peer_task_runner_->PostTask( | |
| 211 FROM_HERE, base::Bind( | |
| 212 &ByteStreamOutputImpl::TransferData, | |
| 213 peer_lifetime_flag_, | |
| 214 peer_, | |
| 215 base::Passed(xfer_buffer.Pass()), | |
| 216 buffer_size, | |
| 217 true /* Source complete. */, | |
| 218 status)); | |
| 219 } | |
| 220 | |
| 221 void ByteStreamInputImpl::RegisterCallback( | |
| 222 const base::Closure& source_callback) { | |
| 223 space_available_callback_ = source_callback; | |
| 224 } | |
| 225 | |
| 226 // static | |
| 227 void ByteStreamInputImpl::UpdateWindow( | |
| 228 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target, | |
| 229 size_t bytes_consumed) { | |
| 230 | |
|
benjhayden
2012/05/16 15:41:13
No blank lines at beginning/ends of blocks.
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 231 // If the target object isn't alive anymore, we do nothing. | |
| 232 if (!lifetime_flag->is_alive_) return; | |
| 233 | |
| 234 DCHECK_GE(target->output_size_used_, bytes_consumed); | |
|
benjhayden
2012/05/16 15:41:13
Want to put the rest of this function in a method
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 235 target->output_size_used_ -= bytes_consumed; | |
| 236 | |
| 237 // Callback if we were above the limit and we're now <= to it. | |
| 238 size_t total_known_size_used = | |
| 239 target->input_contents_size_ + target->output_size_used_; | |
| 240 | |
| 241 if (total_known_size_used <= target->total_buffer_size_ && | |
| 242 (total_known_size_used + bytes_consumed > target->total_buffer_size_) && | |
| 243 !target->space_available_callback_.is_null()) | |
| 244 target->space_available_callback_.Run(); | |
| 245 } | |
| 246 | |
| 247 // Decide whether or not we've bufferred enough for a transfer. | |
| 248 // For right now "enough" will be "anything". | |
| 249 void ByteStreamInputImpl::MaybePostToPeer() { | |
| 250 // Arbitrarily, we buffer to a third of the total size before sending. | |
|
benjhayden
2012/05/16 15:41:13
Does this contradict L248?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Yep; oops. Fixed.
| |
| 251 if (input_contents_size_ > total_buffer_size_ / 3) { | |
|
benjhayden
2012/05/16 15:41:13
Invert and early-return?
benjhayden
2012/05/16 15:41:13
Use a kConstant instead of 3?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Obsolete because of refactor done for another comm
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done. I'm a bit hesitant because if I change this
| |
| 252 scoped_ptr<ContentVector> xfer_buffer(new ContentVector); | |
| 253 xfer_buffer->swap(input_contents_); | |
| 254 size_t buffer_size = input_contents_size_; | |
| 255 output_size_used_ += input_contents_size_; | |
| 256 input_contents_size_ = 0; | |
| 257 | |
| 258 peer_task_runner_->PostTask( | |
| 259 FROM_HERE, base::Bind( | |
| 260 &ByteStreamOutputImpl::TransferData, | |
| 261 peer_lifetime_flag_, | |
| 262 peer_, | |
| 263 base::Passed(xfer_buffer.Pass()), | |
| 264 buffer_size, | |
| 265 false /* Source not complete. */, | |
| 266 content::DOWNLOAD_INTERRUPT_REASON_NONE)); | |
| 267 } | |
| 268 } | |
| 269 | |
| 270 ByteStreamOutputImpl::ByteStreamOutputImpl( | |
| 271 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 272 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 273 size_t buffer_size) | |
| 274 : total_buffer_size_(buffer_size), | |
| 275 my_task_runner_(task_runner), | |
| 276 my_lifetime_flag_(lifetime_flag), | |
| 277 available_contents_size_(0), | |
| 278 received_status_(false), | |
| 279 status_(content::DOWNLOAD_INTERRUPT_REASON_NONE), | |
| 280 unreported_consumed_bytes_(0), | |
| 281 peer_(NULL) { | |
| 282 DCHECK(my_lifetime_flag_.get()); | |
| 283 my_lifetime_flag_->is_alive_ = true; | |
| 284 } | |
| 285 | |
| 286 ByteStreamOutputImpl::~ByteStreamOutputImpl() { | |
| 287 my_lifetime_flag_->is_alive_ = false; | |
| 288 } | |
| 289 | |
| 290 void ByteStreamOutputImpl::SetPeer( | |
| 291 ByteStreamInputImpl* peer, | |
| 292 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 293 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
| 294 peer_ = peer; | |
| 295 peer_task_runner_ = peer_task_runner; | |
| 296 peer_lifetime_flag_ = peer_lifetime_flag; | |
| 297 } | |
| 298 | |
| 299 ByteStreamOutputImpl::StreamState | |
| 300 ByteStreamOutputImpl::Read(scoped_refptr<net::IOBuffer>* data, | |
| 301 size_t* length) { | |
| 302 if (available_contents_.size()) { | |
| 303 *data = available_contents_.front().first; | |
| 304 *length = available_contents_.front().second; | |
| 305 available_contents_.pop_front(); | |
| 306 DCHECK_GE(available_contents_size_, *length); | |
| 307 available_contents_size_ -= *length; | |
| 308 unreported_consumed_bytes_ += *length; | |
| 309 | |
| 310 MaybeUpdateInput(); | |
| 311 return STREAM_HAS_DATA; | |
| 312 } | |
| 313 if (received_status_) { | |
| 314 return STREAM_COMPLETE; | |
| 315 } | |
| 316 return STREAM_EMPTY; | |
| 317 } | |
| 318 | |
| 319 content::DownloadInterruptReason | |
| 320 ByteStreamOutputImpl::GetStatus() const { | |
| 321 DCHECK(received_status_); | |
| 322 return status_; | |
| 323 } | |
| 324 | |
| 325 void ByteStreamOutputImpl::RegisterCallback( | |
| 326 const base::Closure& sink_callback) { | |
| 327 data_available_callback_ = sink_callback; | |
| 328 } | |
| 329 | |
| 330 // static | |
| 331 void ByteStreamOutputImpl::TransferData( | |
| 332 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
| 333 ByteStreamOutputImpl* target, | |
| 334 scoped_ptr<ContentVector> xfer_buffer, | |
| 335 size_t buffer_size, | |
| 336 bool source_complete, | |
| 337 content::DownloadInterruptReason status) { | |
| 338 // If our target is no longer alive, do nothing. | |
| 339 if (!object_lifetime_flag->is_alive_) return; | |
| 340 | |
| 341 if (xfer_buffer.get()) { | |
| 342 target->available_contents_.insert(target->available_contents_.end(), | |
| 343 xfer_buffer->begin(), | |
| 344 xfer_buffer->end()); | |
| 345 target->available_contents_size_ += buffer_size; | |
| 346 } | |
| 347 | |
| 348 if (source_complete) { | |
| 349 target->received_status_ = true; | |
| 350 target->status_ = status; | |
| 351 } | |
| 352 | |
| 353 // Callback if we didn't use to have data and we do now. | |
| 354 if (target->available_contents_size_ == buffer_size && | |
|
benjhayden
2012/05/16 15:41:13
Looks like you can get rid of available_contents_s
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
How do you figure? I agree available_contents_siz
| |
| 355 target->available_contents_size_ > 0 && | |
| 356 !target->data_available_callback_.is_null()) | |
| 357 target->data_available_callback_.Run(); | |
| 358 } | |
| 359 | |
| 360 // Decide whether or not to send the input a window update. | |
| 361 // Currently we do that whenever we've got unreported consumption | |
| 362 // greater than 1/3 of total size. | |
| 363 void ByteStreamOutputImpl::MaybeUpdateInput() { | |
| 364 if (unreported_consumed_bytes_ > total_buffer_size_ / 3) { | |
|
benjhayden
2012/05/16 19:17:21
Invert and early-return?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
Done.
| |
| 365 peer_task_runner_->PostTask( | |
| 366 FROM_HERE, base::Bind( | |
| 367 &ByteStreamInputImpl::UpdateWindow, | |
| 368 peer_lifetime_flag_, | |
| 369 peer_, | |
| 370 unreported_consumed_bytes_)); | |
| 371 unreported_consumed_bytes_ = 0; | |
| 372 } | |
| 373 } | |
| 374 | |
| 375 } // namespace | |
| 376 | |
| 377 namespace content { | |
| 378 | |
| 379 ByteStreamOutput::~ByteStreamOutput() { } | |
| 380 | |
| 381 ByteStreamInput::~ByteStreamInput() { } | |
| 382 | |
| 383 void CreateByteStream( | |
| 384 scoped_ptr<ByteStreamInput>* input, | |
| 385 scoped_ptr<ByteStreamOutput>* output, | |
| 386 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | |
| 387 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | |
| 388 size_t buffer_size) { | |
| 389 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); | |
| 390 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); | |
| 391 | |
| 392 ByteStreamInputImpl* in = new ByteStreamInputImpl( | |
| 393 input_task_runner, input_flag, buffer_size); | |
| 394 ByteStreamOutputImpl* out = new ByteStreamOutputImpl( | |
| 395 output_task_runner, output_flag, buffer_size); | |
| 396 | |
| 397 in->SetPeer(out, output_task_runner, output_flag); | |
| 398 out->SetPeer(in, input_task_runner, input_flag); | |
| 399 input->reset(in); | |
| 400 output->reset(out); | |
| 401 return; | |
|
benjhayden
2012/05/16 15:41:13
Feeling philosophical?
Randy Smith (Not in Mondays)
2012/05/16 20:30:15
You know, it took me a good fifteen seconds to fig
| |
| 402 } | |
| 403 | |
| 404 } // namespace content | |
| OLD | NEW |