Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(479)

Unified Diff: content/browser/streams/stream.cc

Issue 12335087: Implement the Stream registry in content (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/browser/streams/stream.cc
diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc
new file mode 100644
index 0000000000000000000000000000000000000000..8a7351e95b014c5f37661c89e8e954169b82575b
--- /dev/null
+++ b/content/browser/streams/stream.cc
@@ -0,0 +1,115 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream.h"
+
+#include "base/bind.h"
+#include "base/message_loop_proxy.h"
+#include "content/browser/streams/stream_read_observer.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/browser/streams/stream_write_observer.h"
+#include "net/base/io_buffer.h"
+
+namespace {
+// Start throttling the connection at about 1MB
+const size_t kDeferSizeThreshold = 40 * 32768;
+}
+
+namespace content {
+
+Stream::Stream(StreamRegistry* registry, const GURL& security_origin,
+ const GURL& url)
+ : bytes_read_(0),
+ can_add_data_(true),
+ security_origin_(security_origin),
+ stream_url_(url),
+ data_length_(0),
+ registry_(registry) {
+ CreateByteStream(base::MessageLoopProxy::current(),
+ base::MessageLoopProxy::current(),
+ kDeferSizeThreshold,
+ &writer_,
+ &reader_);
+
+ // Setup callback for writing.
+ writer_->RegisterCallback(base::Bind(&Stream::SpaceAvailable, this));
+ reader_->RegisterCallback(base::Bind(&Stream::DataAvailable, this));
+
+ registry_->RegisterStream(this);
+}
+
+Stream::~Stream() {
+}
+
+void Stream::AddReadObserver(StreamReadObserver* observer) {
+ read_observers_.AddObserver(observer);
+}
+
+void Stream::RemoveReadObserver(StreamReadObserver* observer) {
+ read_observers_.RemoveObserver(observer);
+}
+
+void Stream::AddWriteObserver(StreamWriteObserver* observer) {
+ write_observers_.AddObserver(observer);
+}
+
+void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
+ write_observers_.RemoveObserver(observer);
+}
+
+void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
+ can_add_data_ = writer_->Write(buffer, size);
+}
+
+void Stream::Finalize() {
+ writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
+ writer_.reset(NULL);
+
+ DataAvailable();
+}
+
+bool Stream::ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read) {
+ if (!data_) {
+ data_length_ = 0;
+ bytes_read_ = 0;
+ ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
+ switch (state) {
+ case ByteStreamReader::STREAM_HAS_DATA:
+ break;
+ case ByteStreamReader::STREAM_COMPLETE:
+ registry_->OnStreamConsumed(this);
+ return true;
+ case ByteStreamReader::STREAM_EMPTY:
+ return false;
+ }
+ }
+
+ size_t remaining_bytes = data_length_ - bytes_read_;
+ size_t to_read =
+ (size_t)buf_size < remaining_bytes ? buf_size : remaining_bytes;
kinuko 2013/02/26 07:37:55 static_cast<size_t>(buf_size)
Zachary Kuznia 2013/02/26 08:30:03 Done.
+ memcpy(buf->data(), data_->data() + bytes_read_, to_read);
+ bytes_read_ += to_read;
+ if (bytes_read_ >= data_length_) {
+ data_ = NULL;
+ }
kinuko 2013/02/26 07:37:55 nit: no { } necessary for one-line body
Zachary Kuznia 2013/02/26 08:30:03 Done.
+
+ *bytes_read = to_read;
+ return true;
+}
+
+void Stream::SpaceAvailable() {
kinuko 2013/02/26 07:37:55 I prefer naming these OnSpaceAvailable and OnDataA
Zachary Kuznia 2013/02/26 08:30:03 Done.
+ can_add_data_ = true;
+ FOR_EACH_OBSERVER(StreamWriteObserver,
+ write_observers_,
+ OnSpaceAvailable(this));
+}
+
+void Stream::DataAvailable() {
+ FOR_EACH_OBSERVER(StreamReadObserver,
+ read_observers_,
+ OnDataAvailable(this));
+}
+
+} // namespace content
+

Powered by Google App Engine
This is Rietveld 408576698