| Index: content/browser/streams/stream.cc
|
| diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ce88534f20d251191d74ba4be27a9864cfacdbfd
|
| --- /dev/null
|
| +++ b/content/browser/streams/stream.cc
|
| @@ -0,0 +1,124 @@
|
| +// Copyright (c) 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "content/browser/streams/stream.h"
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/message_loop_proxy.h"
|
| +#include "content/browser/streams/stream_read_observer.h"
|
| +#include "content/browser/streams/stream_registry.h"
|
| +#include "content/browser/streams/stream_write_observer.h"
|
| +#include "net/base/io_buffer.h"
|
| +
|
| +namespace {
|
| +// Start throttling the connection at about 1MB
|
| +const size_t kDeferSizeThreshold = 2 * 32768;
|
| +}
|
| +
|
| +namespace content {
|
| +
|
| +Stream::Stream(StreamRegistry* registry, const GURL& security_origin)
|
| + : bytes_read_(0),
|
| + complete_(false),
|
| + can_add_data_(true),
|
| + security_origin_(security_origin),
|
| + data_length_(0),
|
| + registry_(registry) {
|
| + CreateByteStream(base::MessageLoopProxy::current(),
|
| + base::MessageLoopProxy::current(),
|
| + kDeferSizeThreshold,
|
| + &writer_,
|
| + &reader_);
|
| +
|
| + // Setup callback for writing.
|
| + writer_->RegisterCallback(base::Bind(&Stream::SpaceAvailable, this));
|
| + reader_->RegisterCallback(base::Bind(&Stream::DataAvailable, this));
|
| +
|
| + registry_->RegisterStream(this);
|
| +}
|
| +
|
| +Stream::~Stream() {
|
| +}
|
| +
|
| +void Stream::AddReadObserver(StreamReadObserver* observer) {
|
| + read_observers_.AddObserver(observer);
|
| +}
|
| +
|
| +void Stream::RemoveReadObserver(StreamReadObserver* observer) {
|
| + read_observers_.RemoveObserver(observer);
|
| +}
|
| +
|
| +void Stream::AddWriteObserver(StreamWriteObserver* observer) {
|
| + write_observers_.AddObserver(observer);
|
| +}
|
| +
|
| +void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
|
| + write_observers_.RemoveObserver(observer);
|
| +}
|
| +
|
| +void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
|
| + can_add_data_ = writer_->Write(buffer, size);
|
| +}
|
| +
|
| +void Stream::MarkComplete() {
|
| + complete_ = true;
|
| + writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
|
| + writer_.reset(NULL);
|
| +
|
| + MaybeNotifyComplete();
|
| +}
|
| +
|
| +bool Stream::ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read) {
|
| + if (!data_) {
|
| + data_length_ = 0;
|
| + bytes_read_ = 0;
|
| + ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
|
| + switch (state) {
|
| + case ByteStreamReader::STREAM_HAS_DATA:
|
| + break;
|
| + case ByteStreamReader::STREAM_COMPLETE:
|
| + return true;
|
| + case ByteStreamReader::STREAM_EMPTY:
|
| + return false;
|
| + }
|
| + }
|
| +
|
| + size_t remaining_bytes = data_length_ - bytes_read_;
|
| + size_t to_read =
|
| + (size_t)buf_size < remaining_bytes ? buf_size : remaining_bytes;
|
| + memcpy(buf->data(), data_->data() + bytes_read_, to_read);
|
| + bytes_read_ += to_read;
|
| + if (bytes_read_ >= data_length_) {
|
| + data_ = NULL;
|
| + }
|
| +
|
| + *bytes_read = to_read;
|
| + MaybeNotifyComplete();
|
| + return true;
|
| +}
|
| +
|
| +void Stream::MaybeNotifyComplete() {
|
| + if (complete_ && !data_) {
|
| + FOR_EACH_OBSERVER(StreamReadObserver,
|
| + read_observers_,
|
| + OnStreamConsumed(this));
|
| + registry_->OnStreamConsumed(this);
|
| + }
|
| +}
|
| +
|
| +void Stream::SpaceAvailable() {
|
| + can_add_data_ = true;
|
| + FOR_EACH_OBSERVER(StreamWriteObserver,
|
| + write_observers_,
|
| + OnSpaceAvailable(this));
|
| +}
|
| +
|
| +void Stream::DataAvailable() {
|
| + FOR_EACH_OBSERVER(StreamReadObserver,
|
| + read_observers_,
|
| + OnDataAvailable(this));
|
| +}
|
| +
|
| +} // namespace content
|
| +
|
|
|