| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "content/browser/byte_stream.h" | |
| 6 | |
| 7 #include "base/bind.h" | |
| 8 #include "base/location.h" | |
| 9 #include "base/memory/ref_counted.h" | |
| 10 #include "base/memory/weak_ptr.h" | |
| 11 #include "base/sequenced_task_runner.h" | |
| 12 | |
| 13 namespace content { | |
| 14 namespace { | |
| 15 | |
| 16 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > | |
| 17 ContentVector; | |
| 18 | |
| 19 class ByteStreamReaderImpl; | |
| 20 | |
| 21 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be | |
| 22 // cleared in an object destructor and accessed to check for object | |
| 23 // existence. We can't use weak pointers because they're tightly tied to | |
| 24 // threads rather than task runners. | |
| 25 // TODO(rdsmith): A better solution would be extending weak pointers | |
| 26 // to support SequencedTaskRunners. | |
| 27 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { | |
| 28 public: | |
| 29 LifetimeFlag() : is_alive(true) { } | |
| 30 bool is_alive; | |
| 31 | |
| 32 protected: | |
| 33 friend class base::RefCountedThreadSafe<LifetimeFlag>; | |
| 34 virtual ~LifetimeFlag() { } | |
| 35 | |
| 36 private: | |
| 37 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag); | |
| 38 }; | |
| 39 | |
| 40 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and | |
| 41 // SetPeer may happen anywhere; all other operations on each class must | |
| 42 // happen in the context of their SequencedTaskRunner. | |
| 43 class ByteStreamWriterImpl : public ByteStreamWriter { | |
| 44 public: | |
| 45 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 46 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 47 size_t buffer_size); | |
| 48 virtual ~ByteStreamWriterImpl(); | |
| 49 | |
| 50 // Must be called before any operations are performed. | |
| 51 void SetPeer(ByteStreamReaderImpl* peer, | |
| 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
| 54 | |
| 55 // Overridden from ByteStreamWriter. | |
| 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | |
| 57 size_t byte_count) OVERRIDE; | |
| 58 virtual void Close(DownloadInterruptReason status) OVERRIDE; | |
| 59 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | |
| 60 | |
| 61 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | |
| 62 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 63 ByteStreamWriterImpl* target, | |
| 64 size_t bytes_consumed); | |
| 65 | |
| 66 private: | |
| 67 // Called from UpdateWindow when object existence has been validated. | |
| 68 void UpdateWindowInternal(size_t bytes_consumed); | |
| 69 | |
| 70 void PostToPeer(bool complete, DownloadInterruptReason status); | |
| 71 | |
| 72 const size_t total_buffer_size_; | |
| 73 | |
| 74 // All data objects in this class are only valid to access on | |
| 75 // this task runner except as otherwise noted. | |
| 76 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
| 77 | |
| 78 // True while this object is alive. | |
| 79 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
| 80 | |
| 81 base::Closure space_available_callback_; | |
| 82 ContentVector input_contents_; | |
| 83 size_t input_contents_size_; | |
| 84 | |
| 85 // ** Peer information. | |
| 86 | |
| 87 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
| 88 | |
| 89 // How much we've sent to the output that for flow control purposes we | |
| 90 // must assume hasn't been read yet. | |
| 91 size_t output_size_used_; | |
| 92 | |
| 93 // Only valid to access on peer_task_runner_. | |
| 94 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
| 95 | |
| 96 // Only valid to access on peer_task_runner_ if | |
| 97 // |*peer_lifetime_flag_ == true| | |
| 98 ByteStreamReaderImpl* peer_; | |
| 99 }; | |
| 100 | |
| 101 class ByteStreamReaderImpl : public ByteStreamReader { | |
| 102 public: | |
| 103 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 104 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 105 size_t buffer_size); | |
| 106 virtual ~ByteStreamReaderImpl(); | |
| 107 | |
| 108 // Must be called before any operations are performed. | |
| 109 void SetPeer(ByteStreamWriterImpl* peer, | |
| 110 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 111 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
| 112 | |
| 113 // Overridden from ByteStreamReader. | |
| 114 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | |
| 115 size_t* length) OVERRIDE; | |
| 116 virtual DownloadInterruptReason GetStatus() const OVERRIDE; | |
| 117 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | |
| 118 | |
| 119 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and | |
| 120 // |ByteStreamWriterImpl::Close|. | |
| 121 // Receive data from our peer. | |
| 122 // static because it may be called after the object it is targeting | |
| 123 // has been destroyed. It may not access |*target| | |
| 124 // if |*object_lifetime_flag| is false. | |
| 125 static void TransferData( | |
| 126 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
| 127 ByteStreamReaderImpl* target, | |
| 128 scoped_ptr<ContentVector> transfer_buffer, | |
| 129 size_t transfer_buffer_bytes, | |
| 130 bool source_complete, | |
| 131 DownloadInterruptReason status); | |
| 132 | |
| 133 private: | |
| 134 // Called from TransferData once object existence has been validated. | |
| 135 void TransferDataInternal( | |
| 136 scoped_ptr<ContentVector> transfer_buffer, | |
| 137 size_t transfer_buffer_bytes, | |
| 138 bool source_complete, | |
| 139 DownloadInterruptReason status); | |
| 140 | |
| 141 void MaybeUpdateInput(); | |
| 142 | |
| 143 const size_t total_buffer_size_; | |
| 144 | |
| 145 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
| 146 | |
| 147 // True while this object is alive. | |
| 148 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
| 149 | |
| 150 ContentVector available_contents_; | |
| 151 | |
| 152 bool received_status_; | |
| 153 DownloadInterruptReason status_; | |
| 154 | |
| 155 base::Closure data_available_callback_; | |
| 156 | |
| 157 // Time of last point at which data in stream transitioned from full | |
| 158 // to non-full. Nulled when a callback is sent. | |
| 159 base::Time last_non_full_time_; | |
| 160 | |
| 161 // ** Peer information | |
| 162 | |
| 163 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
| 164 | |
| 165 // How much has been removed from this class that we haven't told | |
| 166 // the input about yet. | |
| 167 size_t unreported_consumed_bytes_; | |
| 168 | |
| 169 // Only valid to access on peer_task_runner_. | |
| 170 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
| 171 | |
| 172 // Only valid to access on peer_task_runner_ if | |
| 173 // |*peer_lifetime_flag_ == true| | |
| 174 ByteStreamWriterImpl* peer_; | |
| 175 }; | |
| 176 | |
| 177 ByteStreamWriterImpl::ByteStreamWriterImpl( | |
| 178 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 179 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 180 size_t buffer_size) | |
| 181 : total_buffer_size_(buffer_size), | |
| 182 my_task_runner_(task_runner), | |
| 183 my_lifetime_flag_(lifetime_flag), | |
| 184 input_contents_size_(0), | |
| 185 output_size_used_(0), | |
| 186 peer_(NULL) { | |
| 187 DCHECK(my_lifetime_flag_.get()); | |
| 188 my_lifetime_flag_->is_alive = true; | |
| 189 } | |
| 190 | |
| 191 ByteStreamWriterImpl::~ByteStreamWriterImpl() { | |
| 192 my_lifetime_flag_->is_alive = false; | |
| 193 } | |
| 194 | |
| 195 void ByteStreamWriterImpl::SetPeer( | |
| 196 ByteStreamReaderImpl* peer, | |
| 197 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 198 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
| 199 peer_ = peer; | |
| 200 peer_task_runner_ = peer_task_runner; | |
| 201 peer_lifetime_flag_ = peer_lifetime_flag; | |
| 202 } | |
| 203 | |
| 204 bool ByteStreamWriterImpl::Write( | |
| 205 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | |
| 206 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 207 | |
| 208 input_contents_.push_back(std::make_pair(buffer, byte_count)); | |
| 209 input_contents_size_ += byte_count; | |
| 210 | |
| 211 // Arbitrarily, we buffer to a third of the total size before sending. | |
| 212 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | |
| 213 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); | |
| 214 | |
| 215 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | |
| 216 } | |
| 217 | |
| 218 void ByteStreamWriterImpl::Close( | |
| 219 DownloadInterruptReason status) { | |
| 220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 221 PostToPeer(true, status); | |
| 222 } | |
| 223 | |
| 224 void ByteStreamWriterImpl::RegisterCallback( | |
| 225 const base::Closure& source_callback) { | |
| 226 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 227 space_available_callback_ = source_callback; | |
| 228 } | |
| 229 | |
| 230 // static | |
| 231 void ByteStreamWriterImpl::UpdateWindow( | |
| 232 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, | |
| 233 size_t bytes_consumed) { | |
| 234 // If the target object isn't alive anymore, we do nothing. | |
| 235 if (!lifetime_flag->is_alive) return; | |
| 236 | |
| 237 target->UpdateWindowInternal(bytes_consumed); | |
| 238 } | |
| 239 | |
| 240 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { | |
| 241 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 242 DCHECK_GE(output_size_used_, bytes_consumed); | |
| 243 output_size_used_ -= bytes_consumed; | |
| 244 | |
| 245 // Callback if we were above the limit and we're now <= to it. | |
| 246 size_t total_known_size_used = | |
| 247 input_contents_size_ + output_size_used_; | |
| 248 | |
| 249 if (total_known_size_used <= total_buffer_size_ && | |
| 250 (total_known_size_used + bytes_consumed > total_buffer_size_) && | |
| 251 !space_available_callback_.is_null()) | |
| 252 space_available_callback_.Run(); | |
| 253 } | |
| 254 | |
| 255 void ByteStreamWriterImpl::PostToPeer( | |
| 256 bool complete, DownloadInterruptReason status) { | |
| 257 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 258 // Valid contexts in which to call. | |
| 259 DCHECK(complete || 0 != input_contents_size_); | |
| 260 | |
| 261 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); | |
| 262 size_t buffer_size = 0; | |
| 263 if (0 != input_contents_size_) { | |
| 264 transfer_buffer.reset(new ContentVector); | |
| 265 transfer_buffer->swap(input_contents_); | |
| 266 buffer_size = input_contents_size_; | |
| 267 output_size_used_ += input_contents_size_; | |
| 268 input_contents_size_ = 0; | |
| 269 } | |
| 270 peer_task_runner_->PostTask( | |
| 271 FROM_HERE, base::Bind( | |
| 272 &ByteStreamReaderImpl::TransferData, | |
| 273 peer_lifetime_flag_, | |
| 274 peer_, | |
| 275 base::Passed(&transfer_buffer), | |
| 276 buffer_size, | |
| 277 complete, | |
| 278 status)); | |
| 279 } | |
| 280 | |
| 281 ByteStreamReaderImpl::ByteStreamReaderImpl( | |
| 282 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
| 283 scoped_refptr<LifetimeFlag> lifetime_flag, | |
| 284 size_t buffer_size) | |
| 285 : total_buffer_size_(buffer_size), | |
| 286 my_task_runner_(task_runner), | |
| 287 my_lifetime_flag_(lifetime_flag), | |
| 288 received_status_(false), | |
| 289 status_(DOWNLOAD_INTERRUPT_REASON_NONE), | |
| 290 unreported_consumed_bytes_(0), | |
| 291 peer_(NULL) { | |
| 292 DCHECK(my_lifetime_flag_.get()); | |
| 293 my_lifetime_flag_->is_alive = true; | |
| 294 } | |
| 295 | |
| 296 ByteStreamReaderImpl::~ByteStreamReaderImpl() { | |
| 297 my_lifetime_flag_->is_alive = false; | |
| 298 } | |
| 299 | |
| 300 void ByteStreamReaderImpl::SetPeer( | |
| 301 ByteStreamWriterImpl* peer, | |
| 302 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
| 303 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
| 304 peer_ = peer; | |
| 305 peer_task_runner_ = peer_task_runner; | |
| 306 peer_lifetime_flag_ = peer_lifetime_flag; | |
| 307 } | |
| 308 | |
| 309 ByteStreamReaderImpl::StreamState | |
| 310 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, | |
| 311 size_t* length) { | |
| 312 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 313 | |
| 314 if (available_contents_.size()) { | |
| 315 *data = available_contents_.front().first; | |
| 316 *length = available_contents_.front().second; | |
| 317 available_contents_.pop_front(); | |
| 318 unreported_consumed_bytes_ += *length; | |
| 319 | |
| 320 MaybeUpdateInput(); | |
| 321 return STREAM_HAS_DATA; | |
| 322 } | |
| 323 if (received_status_) { | |
| 324 return STREAM_COMPLETE; | |
| 325 } | |
| 326 return STREAM_EMPTY; | |
| 327 } | |
| 328 | |
| 329 DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const { | |
| 330 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 331 DCHECK(received_status_); | |
| 332 return status_; | |
| 333 } | |
| 334 | |
| 335 void ByteStreamReaderImpl::RegisterCallback( | |
| 336 const base::Closure& sink_callback) { | |
| 337 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 338 | |
| 339 data_available_callback_ = sink_callback; | |
| 340 } | |
| 341 | |
| 342 // static | |
| 343 void ByteStreamReaderImpl::TransferData( | |
| 344 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
| 345 ByteStreamReaderImpl* target, | |
| 346 scoped_ptr<ContentVector> transfer_buffer, | |
| 347 size_t buffer_size, | |
| 348 bool source_complete, | |
| 349 DownloadInterruptReason status) { | |
| 350 // If our target is no longer alive, do nothing. | |
| 351 if (!object_lifetime_flag->is_alive) return; | |
| 352 | |
| 353 target->TransferDataInternal( | |
| 354 transfer_buffer.Pass(), buffer_size, source_complete, status); | |
| 355 } | |
| 356 | |
| 357 void ByteStreamReaderImpl::TransferDataInternal( | |
| 358 scoped_ptr<ContentVector> transfer_buffer, | |
| 359 size_t buffer_size, | |
| 360 bool source_complete, | |
| 361 DownloadInterruptReason status) { | |
| 362 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 363 | |
| 364 bool was_empty = available_contents_.empty(); | |
| 365 | |
| 366 if (transfer_buffer) { | |
| 367 available_contents_.insert(available_contents_.end(), | |
| 368 transfer_buffer->begin(), | |
| 369 transfer_buffer->end()); | |
| 370 } | |
| 371 | |
| 372 if (source_complete) { | |
| 373 received_status_ = true; | |
| 374 status_ = status; | |
| 375 } | |
| 376 | |
| 377 // Callback on transition from empty to non-empty, or | |
| 378 // source complete. | |
| 379 if (((was_empty && !available_contents_.empty()) || | |
| 380 source_complete) && | |
| 381 !data_available_callback_.is_null()) | |
| 382 data_available_callback_.Run(); | |
| 383 } | |
| 384 | |
| 385 // Decide whether or not to send the input a window update. | |
| 386 // Currently we do that whenever we've got unreported consumption | |
| 387 // greater than 1/3 of total size. | |
| 388 void ByteStreamReaderImpl::MaybeUpdateInput() { | |
| 389 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
| 390 | |
| 391 if (unreported_consumed_bytes_ <= | |
| 392 total_buffer_size_ / kFractionReadBeforeWindowUpdate) | |
| 393 return; | |
| 394 | |
| 395 peer_task_runner_->PostTask( | |
| 396 FROM_HERE, base::Bind( | |
| 397 &ByteStreamWriterImpl::UpdateWindow, | |
| 398 peer_lifetime_flag_, | |
| 399 peer_, | |
| 400 unreported_consumed_bytes_)); | |
| 401 unreported_consumed_bytes_ = 0; | |
| 402 } | |
| 403 | |
| 404 } // namespace | |
| 405 | |
| 406 | |
| 407 const int ByteStreamWriter::kFractionBufferBeforeSending = 3; | |
| 408 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3; | |
| 409 | |
| 410 ByteStreamReader::~ByteStreamReader() { } | |
| 411 | |
| 412 ByteStreamWriter::~ByteStreamWriter() { } | |
| 413 | |
| 414 void CreateByteStream( | |
| 415 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | |
| 416 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | |
| 417 size_t buffer_size, | |
| 418 scoped_ptr<ByteStreamWriter>* input, | |
| 419 scoped_ptr<ByteStreamReader>* output) { | |
| 420 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); | |
| 421 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); | |
| 422 | |
| 423 ByteStreamWriterImpl* in = new ByteStreamWriterImpl( | |
| 424 input_task_runner, input_flag, buffer_size); | |
| 425 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( | |
| 426 output_task_runner, output_flag, buffer_size); | |
| 427 | |
| 428 in->SetPeer(out, output_task_runner, output_flag); | |
| 429 out->SetPeer(in, input_task_runner, input_flag); | |
| 430 input->reset(in); | |
| 431 output->reset(out); | |
| 432 } | |
| 433 | |
| 434 } // namespace content | |
| OLD | NEW |