Chromium Code Reviews| Index: content/browser/streams/stream.cc |
| diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8a7351e95b014c5f37661c89e8e954169b82575b |
| --- /dev/null |
| +++ b/content/browser/streams/stream.cc |
| @@ -0,0 +1,115 @@ |
| +// Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "content/browser/streams/stream.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/message_loop_proxy.h" |
| +#include "content/browser/streams/stream_read_observer.h" |
| +#include "content/browser/streams/stream_registry.h" |
| +#include "content/browser/streams/stream_write_observer.h" |
| +#include "net/base/io_buffer.h" |
| + |
| +namespace { |
| +// Start throttling the connection at about 1MB |
| +const size_t kDeferSizeThreshold = 40 * 32768; |
| +} |
| + |
| +namespace content { |
| + |
| +Stream::Stream(StreamRegistry* registry, const GURL& security_origin, |
| + const GURL& url) |
| + : bytes_read_(0), |
| + can_add_data_(true), |
| + security_origin_(security_origin), |
| + stream_url_(url), |
| + data_length_(0), |
| + registry_(registry) { |
| + CreateByteStream(base::MessageLoopProxy::current(), |
| + base::MessageLoopProxy::current(), |
| + kDeferSizeThreshold, |
| + &writer_, |
| + &reader_); |
| + |
| + // Setup callback for writing. |
| + writer_->RegisterCallback(base::Bind(&Stream::SpaceAvailable, this)); |
| + reader_->RegisterCallback(base::Bind(&Stream::DataAvailable, this)); |
| + |
| + registry_->RegisterStream(this); |
| +} |
| + |
| +Stream::~Stream() { |
| +} |
| + |
| +void Stream::AddReadObserver(StreamReadObserver* observer) { |
| + read_observers_.AddObserver(observer); |
| +} |
| + |
| +void Stream::RemoveReadObserver(StreamReadObserver* observer) { |
| + read_observers_.RemoveObserver(observer); |
| +} |
| + |
| +void Stream::AddWriteObserver(StreamWriteObserver* observer) { |
| + write_observers_.AddObserver(observer); |
| +} |
| + |
| +void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { |
| + write_observers_.RemoveObserver(observer); |
| +} |
| + |
| +void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { |
| + can_add_data_ = writer_->Write(buffer, size); |
| +} |
| + |
| +void Stream::Finalize() { |
| + writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE); |
| + writer_.reset(NULL); |
| + |
| + DataAvailable(); |
| +} |
| + |
| +bool Stream::ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read) { |
| + if (!data_) { |
| + data_length_ = 0; |
| + bytes_read_ = 0; |
| + ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); |
| + switch (state) { |
| + case ByteStreamReader::STREAM_HAS_DATA: |
| + break; |
| + case ByteStreamReader::STREAM_COMPLETE: |
| + registry_->OnStreamConsumed(this); |
| + return true; |
| + case ByteStreamReader::STREAM_EMPTY: |
| + return false; |
| + } |
| + } |
| + |
| + size_t remaining_bytes = data_length_ - bytes_read_; |
| + size_t to_read = |
| + (size_t)buf_size < remaining_bytes ? buf_size : remaining_bytes; |
|
kinuko
2013/02/26 07:37:55
static_cast<size_t>(buf_size)
Zachary Kuznia
2013/02/26 08:30:03
Done.
|
| + memcpy(buf->data(), data_->data() + bytes_read_, to_read); |
| + bytes_read_ += to_read; |
| + if (bytes_read_ >= data_length_) { |
| + data_ = NULL; |
| + } |
|
kinuko
2013/02/26 07:37:55
nit: no { } necessary for one-line body
Zachary Kuznia
2013/02/26 08:30:03
Done.
|
| + |
| + *bytes_read = to_read; |
| + return true; |
| +} |
| + |
| +void Stream::SpaceAvailable() { |
|
kinuko
2013/02/26 07:37:55
I prefer naming these OnSpaceAvailable and OnDataA
Zachary Kuznia
2013/02/26 08:30:03
Done.
|
| + can_add_data_ = true; |
| + FOR_EACH_OBSERVER(StreamWriteObserver, |
| + write_observers_, |
| + OnSpaceAvailable(this)); |
| +} |
| + |
| +void Stream::DataAvailable() { |
| + FOR_EACH_OBSERVER(StreamReadObserver, |
| + read_observers_, |
| + OnDataAvailable(this)); |
| +} |
| + |
| +} // namespace content |
| + |