| Index: content/browser/streams/stream.cc
|
| diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..0fee2b422c758109f17f76192c3dd4ce3300352d
|
| --- /dev/null
|
| +++ b/content/browser/streams/stream.cc
|
| @@ -0,0 +1,115 @@
|
| +// Copyright (c) 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "content/browser/streams/stream.h"
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/message_loop_proxy.h"
|
| +#include "content/browser/streams/stream_read_observer.h"
|
| +#include "content/browser/streams/stream_registry.h"
|
| +#include "content/browser/streams/stream_write_observer.h"
|
| +#include "net/base/io_buffer.h"
|
| +
|
| +namespace {
|
| +// Start throttling the connection at about 1MB
|
| +const size_t kDeferSizeThreshold = 40 * 32768;
|
| +}
|
| +
|
| +namespace content {
|
| +
|
| +Stream::Stream(StreamRegistry* registry, const GURL& security_origin,
|
| + const GURL& url)
|
| + : bytes_read_(0),
|
| + can_add_data_(true),
|
| + security_origin_(security_origin),
|
| + url_(url),
|
| + data_length_(0),
|
| + registry_(registry) {
|
| + CreateByteStream(base::MessageLoopProxy::current(),
|
| + base::MessageLoopProxy::current(),
|
| + kDeferSizeThreshold,
|
| + &writer_,
|
| + &reader_);
|
| +
|
| + // Setup callback for writing.
|
| + writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, this));
|
| + reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, this));
|
| +
|
| + registry_->RegisterStream(this);
|
| +}
|
| +
|
| +Stream::~Stream() {
|
| +}
|
| +
|
| +void Stream::AddReadObserver(StreamReadObserver* observer) {
|
| + read_observers_.AddObserver(observer);
|
| +}
|
| +
|
| +void Stream::RemoveReadObserver(StreamReadObserver* observer) {
|
| + read_observers_.RemoveObserver(observer);
|
| +}
|
| +
|
| +void Stream::AddWriteObserver(StreamWriteObserver* observer) {
|
| + write_observers_.AddObserver(observer);
|
| +}
|
| +
|
| +void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
|
| + write_observers_.RemoveObserver(observer);
|
| +}
|
| +
|
| +void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
|
| + can_add_data_ = writer_->Write(buffer, size);
|
| +}
|
| +
|
| +void Stream::Finalize() {
|
| + writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
|
| + writer_.reset(NULL);
|
| +
|
| + OnDataAvailable();
|
| +}
|
| +
|
| +bool Stream::ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read) {
|
| + if (!data_) {
|
| + data_length_ = 0;
|
| + bytes_read_ = 0;
|
| + ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
|
| + switch (state) {
|
| + case ByteStreamReader::STREAM_HAS_DATA:
|
| + break;
|
| + case ByteStreamReader::STREAM_COMPLETE:
|
| + registry_->UnregisterStream(url());
|
| + return true;
|
| + case ByteStreamReader::STREAM_EMPTY:
|
| + return false;
|
| + }
|
| + }
|
| +
|
| + size_t remaining_bytes = data_length_ - bytes_read_;
|
| + size_t to_read =
|
| + static_cast<size_t>(buf_size) < remaining_bytes ?
|
| + buf_size : remaining_bytes;
|
| + memcpy(buf->data(), data_->data() + bytes_read_, to_read);
|
| + bytes_read_ += to_read;
|
| + if (bytes_read_ >= data_length_)
|
| + data_ = NULL;
|
| +
|
| + *bytes_read = to_read;
|
| + return true;
|
| +}
|
| +
|
| +void Stream::OnSpaceAvailable() {
|
| + can_add_data_ = true;
|
| + FOR_EACH_OBSERVER(StreamWriteObserver,
|
| + write_observers_,
|
| + OnSpaceAvailable(this));
|
| +}
|
| +
|
| +void Stream::OnDataAvailable() {
|
| + FOR_EACH_OBSERVER(StreamReadObserver,
|
| + read_observers_,
|
| + OnDataAvailable(this));
|
| +}
|
| +
|
| +} // namespace content
|
| +
|
|
|