Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Side by Side Diff: content/browser/streams/stream.cc

Issue 22908008: Limit the total memory usage for Stream instances (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: creis's comments Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « content/browser/streams/stream.h ('k') | content/browser/streams/stream_registry.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/streams/stream.h" 5 #include "content/browser/streams/stream.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h" 9 #include "base/message_loop/message_loop_proxy.h"
10 #include "content/browser/streams/stream_handle_impl.h" 10 #include "content/browser/streams/stream_handle_impl.h"
11 #include "content/browser/streams/stream_read_observer.h" 11 #include "content/browser/streams/stream_read_observer.h"
12 #include "content/browser/streams/stream_registry.h" 12 #include "content/browser/streams/stream_registry.h"
13 #include "content/browser/streams/stream_write_observer.h" 13 #include "content/browser/streams/stream_write_observer.h"
14 #include "net/base/io_buffer.h" 14 #include "net/base/io_buffer.h"
15 15
16 namespace { 16 namespace {
17 // Start throttling the connection at about 1MB. 17 // Start throttling the connection at about 1MB.
18 const size_t kDeferSizeThreshold = 40 * 32768; 18 const size_t kDeferSizeThreshold = 40 * 32768;
19 } 19 }
20 20
21 namespace content { 21 namespace content {
22 22
23 Stream::Stream(StreamRegistry* registry, 23 Stream::Stream(StreamRegistry* registry,
24 StreamWriteObserver* write_observer, 24 StreamWriteObserver* write_observer,
25 const GURL& url) 25 const GURL& url)
26 : data_bytes_read_(0), 26 : data_bytes_read_(0),
27 can_add_data_(true), 27 can_add_data_(true),
28 url_(url), 28 url_(url),
29 data_length_(0), 29 data_length_(0),
30 last_total_buffered_bytes_(0),
30 registry_(registry), 31 registry_(registry),
31 read_observer_(NULL), 32 read_observer_(NULL),
32 write_observer_(write_observer), 33 write_observer_(write_observer),
33 stream_handle_(NULL), 34 stream_handle_(NULL),
34 weak_ptr_factory_(this) { 35 weak_ptr_factory_(this) {
35 CreateByteStream(base::MessageLoopProxy::current(), 36 CreateByteStream(base::MessageLoopProxy::current(),
36 base::MessageLoopProxy::current(), 37 base::MessageLoopProxy::current(),
37 kDeferSizeThreshold, 38 kDeferSizeThreshold,
38 &writer_, 39 &writer_,
39 &reader_); 40 &reader_);
(...skipping 20 matching lines...) Expand all
60 void Stream::RemoveReadObserver(StreamReadObserver* observer) { 61 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
61 DCHECK(observer == read_observer_); 62 DCHECK(observer == read_observer_);
62 read_observer_ = NULL; 63 read_observer_ = NULL;
63 } 64 }
64 65
65 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) { 66 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
66 DCHECK(observer == write_observer_); 67 DCHECK(observer == write_observer_);
67 write_observer_ = NULL; 68 write_observer_ = NULL;
68 } 69 }
69 70
71 void Stream::Abort() {
72 // Clear all buffer. It's safe to clear reader_ here since the same thread
73 // is used for both input and output operation.
74 writer_.reset();
75 reader_.reset();
76 can_add_data_ = false;
77 registry_->UnregisterStream(url());
78 }
79
70 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { 80 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
81 if (!writer_.get())
82 return;
83
84 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
85 if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
86 Abort();
87 return;
88 }
89
90 // Now it's guaranteed that this doesn't overflow. This must be done before
91 // Write() since GetTotalBufferedBytes() may return different value after
92 // Write() call, so if we use the new value, information in this instance and
93 // one in |registry_| become inconsistent.
94 last_total_buffered_bytes_ = current_buffered_bytes + size;
95
71 can_add_data_ = writer_->Write(buffer, size); 96 can_add_data_ = writer_->Write(buffer, size);
72 } 97 }
73 98
74 void Stream::AddData(const char* data, size_t size) { 99 void Stream::AddData(const char* data, size_t size) {
75 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size)); 100 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
76 memcpy(io_buffer->data(), data, size); 101 memcpy(io_buffer->data(), data, size);
77 can_add_data_ = writer_->Write(io_buffer, size); 102 AddData(io_buffer, size);
78 } 103 }
79 104
80 void Stream::Finalize() { 105 void Stream::Finalize() {
106 if (!writer_.get())
107 return;
108
81 writer_->Close(0); 109 writer_->Close(0);
82 writer_.reset(NULL); 110 writer_.reset();
83 111
84 // Continue asynchronously. 112 // Continue asynchronously.
85 base::MessageLoopProxy::current()->PostTask( 113 base::MessageLoopProxy::current()->PostTask(
86 FROM_HERE, 114 FROM_HERE,
87 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr())); 115 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
88 } 116 }
89 117
90 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, 118 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
91 int buf_size, 119 int buf_size,
92 int* bytes_read) { 120 int* bytes_read) {
93 DCHECK(buf); 121 DCHECK(buf);
94 DCHECK(bytes_read); 122 DCHECK(bytes_read);
95 123
96 *bytes_read = 0; 124 *bytes_read = 0;
97 if (!data_.get()) { 125 if (!data_.get()) {
126 // TODO(tyoshino): Add STREAM_ABORTED type to tell the reader that this
127 // stream is aborted.
128 if (!reader_.get())
129 return STREAM_EMPTY;
130
98 data_length_ = 0; 131 data_length_ = 0;
99 data_bytes_read_ = 0; 132 data_bytes_read_ = 0;
100 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); 133 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
101 switch (state) { 134 switch (state) {
102 case ByteStreamReader::STREAM_HAS_DATA: 135 case ByteStreamReader::STREAM_HAS_DATA:
103 break; 136 break;
104 case ByteStreamReader::STREAM_COMPLETE: 137 case ByteStreamReader::STREAM_COMPLETE:
105 registry_->UnregisterStream(url()); 138 registry_->UnregisterStream(url());
106 return STREAM_COMPLETE; 139 return STREAM_COMPLETE;
107 case ByteStreamReader::STREAM_EMPTY: 140 case ByteStreamReader::STREAM_EMPTY:
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 if (write_observer_) 180 if (write_observer_)
148 write_observer_->OnSpaceAvailable(this); 181 write_observer_->OnSpaceAvailable(this);
149 } 182 }
150 183
151 void Stream::OnDataAvailable() { 184 void Stream::OnDataAvailable() {
152 if (read_observer_) 185 if (read_observer_)
153 read_observer_->OnDataAvailable(this); 186 read_observer_->OnDataAvailable(this);
154 } 187 }
155 188
156 } // namespace content 189 } // namespace content
OLDNEW
« no previous file with comments | « content/browser/streams/stream.h ('k') | content/browser/streams/stream_registry.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698