Index: content/browser/streams/stream.cc |
diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..689afecc1f7e055dc6edf06cf330733c566fd982 |
--- /dev/null |
+++ b/content/browser/streams/stream.cc |
@@ -0,0 +1,115 @@ |
+// Copyright (c) 2013 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "content/browser/streams/stream.h" |
+ |
+#include "base/bind.h" |
+#include "base/message_loop_proxy.h" |
+#include "content/browser/streams/stream_read_observer.h" |
+#include "content/browser/streams/stream_registry.h" |
+#include "content/browser/streams/stream_write_observer.h" |
+#include "net/base/io_buffer.h" |
+ |
+namespace { |
+// Start throttling the connection at about 1MB |
+const size_t kDeferSizeThreshold = 40 * 32768; |
+} |
+ |
+namespace content { |
+ |
+Stream::Stream(StreamRegistry* registry, const GURL& security_origin, |
+ const GURL& url) |
+ : bytes_read_(0), |
+ can_add_data_(true), |
+ security_origin_(security_origin), |
+ stream_url_(url), |
+ data_length_(0), |
+ registry_(registry) { |
+ CreateByteStream(base::MessageLoopProxy::current(), |
+ base::MessageLoopProxy::current(), |
+ kDeferSizeThreshold, |
+ &writer_, |
+ &reader_); |
+ |
+ // Setup callback for writing. |
+ writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, this)); |
+ reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, this)); |
+ |
+ registry_->RegisterStream(this); |
+} |
+ |
+Stream::~Stream() { |
+} |
+ |
+void Stream::AddReadObserver(StreamReadObserver* observer) { |
+ read_observers_.AddObserver(observer); |
+} |
+ |
+void Stream::RemoveReadObserver(StreamReadObserver* observer) { |
+ read_observers_.RemoveObserver(observer); |
+} |
+ |
+void Stream::AddWriteObserver(StreamWriteObserver* observer) { |
+ write_observers_.AddObserver(observer); |
+} |
+ |
+void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { |
+ write_observers_.RemoveObserver(observer); |
+} |
+ |
+void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { |
+ can_add_data_ = writer_->Write(buffer, size); |
+} |
+ |
+void Stream::Finalize() { |
+ writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE); |
+ writer_.reset(NULL); |
+ |
+ OnDataAvailable(); |
+} |
+ |
+bool Stream::ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read) { |
+ if (!data_) { |
+ data_length_ = 0; |
+ bytes_read_ = 0; |
+ ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); |
+ switch (state) { |
+ case ByteStreamReader::STREAM_HAS_DATA: |
+ break; |
+ case ByteStreamReader::STREAM_COMPLETE: |
+ registry_->OnStreamConsumed(this); |
+ return true; |
+ case ByteStreamReader::STREAM_EMPTY: |
+ return false; |
+ } |
+ } |
+ |
+ size_t remaining_bytes = data_length_ - bytes_read_; |
+ size_t to_read = |
+ static_cast<size_t>(buf_size) < remaining_bytes ? |
+ buf_size : remaining_bytes; |
+ memcpy(buf->data(), data_->data() + bytes_read_, to_read); |
+ bytes_read_ += to_read; |
+ if (bytes_read_ >= data_length_) |
+ data_ = NULL; |
+ |
+ *bytes_read = to_read; |
+ return true; |
+} |
+ |
+void Stream::OnSpaceAvailable() { |
+ can_add_data_ = true; |
+ FOR_EACH_OBSERVER(StreamWriteObserver, |
+ write_observers_, |
+ OnSpaceAvailable(this)); |
+} |
+ |
+void Stream::OnDataAvailable() { |
+ FOR_EACH_OBSERVER(StreamReadObserver, |
+ read_observers_, |
+ OnDataAvailable(this)); |
+} |
+ |
+} // namespace content |
+ |