Index: content/browser/streams/stream.cc |
diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc |
index 6026df9e7c759629ffcd661ab70c676cd4d1691b..3d0624049d55da4dfae7c2663c4cb8ba60d05738 100644 |
--- a/content/browser/streams/stream.cc |
+++ b/content/browser/streams/stream.cc |
@@ -4,6 +4,8 @@ |
#include "content/browser/streams/stream.h" |
+#include <limits> |
+ |
#include "base/bind.h" |
#include "base/location.h" |
#include "base/message_loop/message_loop_proxy.h" |
@@ -27,6 +29,7 @@ Stream::Stream(StreamRegistry* registry, |
can_add_data_(true), |
url_(url), |
data_length_(0), |
+ last_total_buffered_bytes_(0), |
registry_(registry), |
read_observer_(NULL), |
write_observer_(write_observer), |
@@ -67,19 +70,45 @@ void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { |
write_observer_ = NULL; |
} |
+void Stream::Abort() { |
+ // Clear all buffer. It's safe to clear reader_ here since the same thread |
+ // is used for both input and output operation. |
+ writer_.reset(); |
+ reader_.reset(); |
+ can_add_data_ = false; |
+ registry_->UnregisterStream(url()); |
+} |
+ |
void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { |
+ if (!writer_.get()) |
+ return; |
+ |
+ size_t current_buffered_bytes = writer_->GetTotalBufferedBytes(); |
+ if (size > std::numeric_limits<size_t>::max() - current_buffered_bytes) { |
+ Abort(); |
+ return; |
+ } |
+ size_t new_buffered_bytes = current_buffered_bytes + size; |
+ if (!registry_->UpdateMemoryUsage(url(), new_buffered_bytes)) { |
+ Abort(); |
+ return; |
+ } |
can_add_data_ = writer_->Write(buffer, size); |
+ last_total_buffered_bytes_ = new_buffered_bytes; |
} |
void Stream::AddData(const char* data, size_t size) { |
scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); |
memcpy(io_buffer->data(), data, size); |
- can_add_data_ = writer_->Write(io_buffer, size); |
+ AddData(io_buffer, size); |
} |
void Stream::Finalize() { |
+ if (!writer_.get()) |
+ return; |
+ |
writer_->Close(0); |
- writer_.reset(NULL); |
+ writer_.reset(); |
// Continue asynchronously. |
base::MessageLoopProxy::current()->PostTask( |
@@ -95,6 +124,11 @@ Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, |
*bytes_read = 0; |
if (!data_.get()) { |
+ // TODO(tyoshino): Add STREAM_ABORTED type to tell the reader that this |
+ // stream is aborted. |
+ if (!reader_.get()) |
+ return STREAM_EMPTY; |
+ |
data_length_ = 0; |
data_bytes_read_ = 0; |
ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); |