Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(129)

Side by Side Diff: content/browser/streams/stream.cc

Issue 22908008: Limit the total memory usage for Stream instances (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: kinuko's comment and rebase Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/streams/stream.h" 5 #include "content/browser/streams/stream.h"
6 6
7 #include <limits>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/location.h" 10 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h" 11 #include "base/message_loop/message_loop_proxy.h"
10 #include "content/browser/streams/stream_handle_impl.h" 12 #include "content/browser/streams/stream_handle_impl.h"
11 #include "content/browser/streams/stream_read_observer.h" 13 #include "content/browser/streams/stream_read_observer.h"
12 #include "content/browser/streams/stream_registry.h" 14 #include "content/browser/streams/stream_registry.h"
13 #include "content/browser/streams/stream_write_observer.h" 15 #include "content/browser/streams/stream_write_observer.h"
14 #include "net/base/io_buffer.h" 16 #include "net/base/io_buffer.h"
15 17
16 namespace { 18 namespace {
17 // Start throttling the connection at about 1MB. 19 // Start throttling the connection at about 1MB.
18 const size_t kDeferSizeThreshold = 40 * 32768; 20 const size_t kDeferSizeThreshold = 40 * 32768;
19 } 21 }
20 22
21 namespace content { 23 namespace content {
22 24
23 Stream::Stream(StreamRegistry* registry, 25 Stream::Stream(StreamRegistry* registry,
24 StreamWriteObserver* write_observer, 26 StreamWriteObserver* write_observer,
25 const GURL& url) 27 const GURL& url)
26 : data_bytes_read_(0), 28 : data_bytes_read_(0),
27 can_add_data_(true), 29 can_add_data_(true),
28 url_(url), 30 url_(url),
29 data_length_(0), 31 data_length_(0),
32 last_total_buffered_bytes_(0),
30 registry_(registry), 33 registry_(registry),
31 read_observer_(NULL), 34 read_observer_(NULL),
32 write_observer_(write_observer), 35 write_observer_(write_observer),
33 stream_handle_(NULL), 36 stream_handle_(NULL),
34 weak_ptr_factory_(this) { 37 weak_ptr_factory_(this) {
35 CreateByteStream(base::MessageLoopProxy::current(), 38 CreateByteStream(base::MessageLoopProxy::current(),
36 base::MessageLoopProxy::current(), 39 base::MessageLoopProxy::current(),
37 kDeferSizeThreshold, 40 kDeferSizeThreshold,
38 &writer_, 41 &writer_,
39 &reader_); 42 &reader_);
(...skipping 20 matching lines...) Expand all
60 void Stream::RemoveReadObserver(StreamReadObserver* observer) { 63 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
61 DCHECK(observer == read_observer_); 64 DCHECK(observer == read_observer_);
62 read_observer_ = NULL; 65 read_observer_ = NULL;
63 } 66 }
64 67
65 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 68 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
66 DCHECK(observer == write_observer_); 69 DCHECK(observer == write_observer_);
67 write_observer_ = NULL; 70 write_observer_ = NULL;
68 } 71 }
69 72
73 void Stream::Abort() {
74 // Clear all buffer. It's safe to clear reader_ here since the same thread
75 // is used for both input and output operation.
76 writer_.reset();
77 reader_.reset();
78 can_add_data_ = false;
79 registry_->UnregisterStream(url());
80 }
81
70 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 82 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
83 if (!writer_.get())
84 return;
85
86 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
87 if (size > std::numeric_limits<size_t>::max() - current_buffered_bytes) {
88 Abort();
89 return;
90 }
91 size_t new_buffered_bytes = current_buffered_bytes + size;
92 if (!registry_->UpdateMemoryUsage(url(), new_buffered_bytes)) {
93 Abort();
94 return;
95 }
71 can_add_data_ = writer_->Write(buffer, size); 96 can_add_data_ = writer_->Write(buffer, size);
97 last_total_buffered_bytes_ = new_buffered_bytes;
72 } 98 }
73 99
74 void Stream::AddData(const char* data, size_t size) { 100 void Stream::AddData(const char* data, size_t size) {
75 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); 101 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
76 memcpy(io_buffer->data(), data, size); 102 memcpy(io_buffer->data(), data, size);
77 can_add_data_ = writer_->Write(io_buffer, size); 103 AddData(io_buffer, size);
78 } 104 }
79 105
80 void Stream::Finalize() { 106 void Stream::Finalize() {
107 if (!writer_.get())
108 return;
109
81 writer_->Close(0); 110 writer_->Close(0);
82 writer_.reset(NULL); 111 writer_.reset();
83 112
84 // Continue asynchronously. 113 // Continue asynchronously.
85 base::MessageLoopProxy::current()->PostTask( 114 base::MessageLoopProxy::current()->PostTask(
86 FROM_HERE, 115 FROM_HERE,
87 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 116 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
88 } 117 }
89 118
90 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 119 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
91 int buf_size, 120 int buf_size,
92 int* bytes_read) { 121 int* bytes_read) {
93 DCHECK(buf); 122 DCHECK(buf);
94 DCHECK(bytes_read); 123 DCHECK(bytes_read);
95 124
96 *bytes_read = 0; 125 *bytes_read = 0;
97 if (!data_.get()) { 126 if (!data_.get()) {
127 // TODO(tyoshino): Add STREAM_ABORTED type to tell the reader that this
128 // stream is aborted.
129 if (!reader_.get())
130 return STREAM_EMPTY;
131
98 data_length_ = 0; 132 data_length_ = 0;
99 data_bytes_read_ = 0; 133 data_bytes_read_ = 0;
100 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 134 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
101 switch (state) { 135 switch (state) {
102 case ByteStreamReader::STREAM_HAS_DATA: 136 case ByteStreamReader::STREAM_HAS_DATA:
103 break; 137 break;
104 case ByteStreamReader::STREAM_COMPLETE: 138 case ByteStreamReader::STREAM_COMPLETE:
105 registry_->UnregisterStream(url()); 139 registry_->UnregisterStream(url());
106 return STREAM_COMPLETE; 140 return STREAM_COMPLETE;
107 case ByteStreamReader::STREAM_EMPTY: 141 case ByteStreamReader::STREAM_EMPTY:
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 if (write_observer_) 181 if (write_observer_)
148 write_observer_->OnSpaceAvailable(this); 182 write_observer_->OnSpaceAvailable(this);
149 } 183 }
150 184
151 void Stream::OnDataAvailable() { 185 void Stream::OnDataAvailable() {
152 if (read_observer_) 186 if (read_observer_)
153 read_observer_->OnDataAvailable(this); 187 read_observer_->OnDataAvailable(this);
154 } 188 }
155 189
156 } // namespace content 190 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698