Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(106)

Side by Side Diff: content/browser/streams/stream.cc

Issue 22908008: Limit the total memory usage for Stream instances (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added <limits> Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/streams/stream.h" 5 #include "content/browser/streams/stream.h"
6 6
7 #include <limits>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/location.h" 10 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h" 11 #include "base/message_loop/message_loop_proxy.h"
10 #include "content/browser/streams/stream_handle_impl.h" 12 #include "content/browser/streams/stream_handle_impl.h"
11 #include "content/browser/streams/stream_read_observer.h" 13 #include "content/browser/streams/stream_read_observer.h"
12 #include "content/browser/streams/stream_registry.h" 14 #include "content/browser/streams/stream_registry.h"
13 #include "content/browser/streams/stream_write_observer.h" 15 #include "content/browser/streams/stream_write_observer.h"
14 #include "net/base/io_buffer.h" 16 #include "net/base/io_buffer.h"
15 17
16 namespace { 18 namespace {
17 // Start throttling the connection at about 1MB. 19 // Start throttling the connection at about 1MB.
18 const size_t kDeferSizeThreshold = 40 * 32768; 20 const size_t kDeferSizeThreshold = 40 * 32768;
19 } 21 }
20 22
21 namespace content { 23 namespace content {
22 24
23 Stream::Stream(StreamRegistry* registry, 25 Stream::Stream(StreamRegistry* registry,
24 StreamWriteObserver* write_observer, 26 StreamWriteObserver* write_observer,
25 const GURL& url) 27 const GURL& url)
26 : data_bytes_read_(0), 28 : data_bytes_read_(0),
27 can_add_data_(true), 29 can_add_data_(true),
28 url_(url), 30 url_(url),
29 data_length_(0), 31 data_length_(0),
32 last_total_buffered_bytes_(0),
30 registry_(registry), 33 registry_(registry),
31 read_observer_(NULL), 34 read_observer_(NULL),
32 write_observer_(write_observer), 35 write_observer_(write_observer),
33 stream_handle_(NULL), 36 stream_handle_(NULL),
34 weak_ptr_factory_(this) { 37 weak_ptr_factory_(this) {
35 CreateByteStream(base::MessageLoopProxy::current(), 38 CreateByteStream(base::MessageLoopProxy::current(),
36 base::MessageLoopProxy::current(), 39 base::MessageLoopProxy::current(),
37 kDeferSizeThreshold, 40 kDeferSizeThreshold,
38 &writer_, 41 &writer_,
39 &reader_); 42 &reader_);
(...skipping 21 matching lines...) Expand all
61 DCHECK(observer == read_observer_); 64 DCHECK(observer == read_observer_);
62 read_observer_ = NULL; 65 read_observer_ = NULL;
63 } 66 }
64 67
65 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 68 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
66 DCHECK(observer == write_observer_); 69 DCHECK(observer == write_observer_);
67 write_observer_ = NULL; 70 write_observer_ = NULL;
68 } 71 }
69 72
70 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 73 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
74 if (!writer_.get())
75 return;
76
77 size_t new_size = writer_->TotalBufferedBytes() + size;
78 if (!registry_->CanIncreaseMemoryUsage(
79 url(), last_total_buffered_bytes_, new_size)) {
80 // Clear all buffer. It's safe to clear reader_ here since the same thread
81 // is used for both input and output operation.
82 writer_.reset();
83 reader_.reset();
kinuko 2013/08/14 09:32:23 Are we going to call Abort() here after your other
tyoshino (SeeGerritForStatus) 2013/08/15 04:15:33 Right!
84 can_add_data_ = false;
85 registry_->UnregisterStream(url());
86 return;
87 }
71 can_add_data_ = writer_->Write(buffer, size); 88 can_add_data_ = writer_->Write(buffer, size);
89 last_total_buffered_bytes_ = new_size;
72 } 90 }
73 91
74 void Stream::AddData(const char* data, size_t size) { 92 void Stream::AddData(const char* data, size_t size) {
75 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); 93 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
76 memcpy(io_buffer->data(), data, size); 94 memcpy(io_buffer->data(), data, size);
77 can_add_data_ = writer_->Write(io_buffer, size); 95 AddData(io_buffer, size);
78 } 96 }
79 97
80 void Stream::Finalize() { 98 void Stream::Finalize() {
99 if (!writer_.get())
100 return;
101
81 writer_->Close(0); 102 writer_->Close(0);
82 writer_.reset(NULL); 103 writer_.reset();
83 104
84 // Continue asynchronously. 105 // Continue asynchronously.
85 base::MessageLoopProxy::current()->PostTask( 106 base::MessageLoopProxy::current()->PostTask(
86 FROM_HERE, 107 FROM_HERE,
87 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 108 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
88 } 109 }
89 110
90 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 111 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
91 int buf_size, 112 int buf_size,
92 int* bytes_read) { 113 int* bytes_read) {
93 *bytes_read = 0; 114 *bytes_read = 0;
94 if (!data_.get()) { 115 if (!data_.get()) {
116 // TODO(tyoshino): Add STREAM_ABORTED type to tell the reader that this
117 // stream is aborted.
118 if (!reader_.get())
119 return STREAM_EMPTY;
120
95 data_length_ = 0; 121 data_length_ = 0;
96 data_bytes_read_ = 0; 122 data_bytes_read_ = 0;
97 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 123 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
98 switch (state) { 124 switch (state) {
99 case ByteStreamReader::STREAM_HAS_DATA: 125 case ByteStreamReader::STREAM_HAS_DATA:
100 break; 126 break;
101 case ByteStreamReader::STREAM_COMPLETE: 127 case ByteStreamReader::STREAM_COMPLETE:
102 registry_->UnregisterStream(url()); 128 registry_->UnregisterStream(url());
103 return STREAM_COMPLETE; 129 return STREAM_COMPLETE;
104 case ByteStreamReader::STREAM_EMPTY: 130 case ByteStreamReader::STREAM_EMPTY:
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
144 if (write_observer_) 170 if (write_observer_)
145 write_observer_->OnSpaceAvailable(this); 171 write_observer_->OnSpaceAvailable(this);
146 } 172 }
147 173
148 void Stream::OnDataAvailable() { 174 void Stream::OnDataAvailable() {
149 if (read_observer_) 175 if (read_observer_)
150 read_observer_->OnDataAvailable(this); 176 read_observer_->OnDataAvailable(this);
151 } 177 }
152 178
153 } // namespace content 179 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698