Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1118)

Side by Side Diff: net/socket_stream/socket_stream.cc

Issue 342052: Implement websocket throttling. (Closed)
Patch Set: Fix tyoshino's comment Created 11 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/socket_stream/socket_stream.h ('k') | net/socket_stream/socket_stream_throttle.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 // 4 //
5 // TODO(ukai): code is similar with http_network_transaction.cc. We should 5 // TODO(ukai): code is similar with http_network_transaction.cc. We should
6 // think about ways to share code, if possible. 6 // think about ways to share code, if possible.
7 7
8 #include "net/socket_stream/socket_stream.h" 8 #include "net/socket_stream/socket_stream.h"
9 9
10 #include <string> 10 #include <string>
11 11
12 #include "base/compiler_specific.h" 12 #include "base/compiler_specific.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/message_loop.h" 14 #include "base/message_loop.h"
15 #include "base/string_util.h" 15 #include "base/string_util.h"
16 #include "net/base/auth.h" 16 #include "net/base/auth.h"
17 #include "net/base/host_resolver.h" 17 #include "net/base/host_resolver.h"
18 #include "net/base/io_buffer.h" 18 #include "net/base/io_buffer.h"
19 #include "net/base/net_errors.h" 19 #include "net/base/net_errors.h"
20 #include "net/base/net_util.h" 20 #include "net/base/net_util.h"
21 #include "net/http/http_response_headers.h" 21 #include "net/http/http_response_headers.h"
22 #include "net/http/http_util.h" 22 #include "net/http/http_util.h"
23 #include "net/socket/client_socket_factory.h" 23 #include "net/socket/client_socket_factory.h"
24 #include "net/socket/ssl_client_socket.h" 24 #include "net/socket/ssl_client_socket.h"
25 #include "net/socket/socks5_client_socket.h" 25 #include "net/socket/socks5_client_socket.h"
26 #include "net/socket/socks_client_socket.h" 26 #include "net/socket/socks_client_socket.h"
27 #include "net/socket/tcp_client_socket.h" 27 #include "net/socket/tcp_client_socket.h"
28 #include "net/socket_stream/socket_stream_throttle.h"
28 #include "net/url_request/url_request.h" 29 #include "net/url_request/url_request.h"
29 30
30 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes. 31 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
31 static const int kReadBufferSize = 4096; 32 static const int kReadBufferSize = 4096;
32 33
33 namespace net { 34 namespace net {
34 35
35 void SocketStream::ResponseHeaders::Realloc(size_t new_size) { 36 void SocketStream::ResponseHeaders::Realloc(size_t new_size) {
36 headers_.reset(static_cast<char*>(realloc(headers_.release(), new_size))); 37 headers_.reset(static_cast<char*>(realloc(headers_.release(), new_size)));
37 } 38 }
(...skipping 10 matching lines...) Expand all
48 ALLOW_THIS_IN_INITIALIZER_LIST( 49 ALLOW_THIS_IN_INITIALIZER_LIST(
49 io_callback_(this, &SocketStream::OnIOCompleted)), 50 io_callback_(this, &SocketStream::OnIOCompleted)),
50 ALLOW_THIS_IN_INITIALIZER_LIST( 51 ALLOW_THIS_IN_INITIALIZER_LIST(
51 read_callback_(this, &SocketStream::OnReadCompleted)), 52 read_callback_(this, &SocketStream::OnReadCompleted)),
52 ALLOW_THIS_IN_INITIALIZER_LIST( 53 ALLOW_THIS_IN_INITIALIZER_LIST(
53 write_callback_(this, &SocketStream::OnWriteCompleted)), 54 write_callback_(this, &SocketStream::OnWriteCompleted)),
54 read_buf_(NULL), 55 read_buf_(NULL),
55 write_buf_(NULL), 56 write_buf_(NULL),
56 current_write_buf_(NULL), 57 current_write_buf_(NULL),
57 write_buf_offset_(0), 58 write_buf_offset_(0),
58 write_buf_size_(0) { 59 write_buf_size_(0),
60 throttle_(
61 SocketStreamThrottle::GetSocketStreamThrottleForScheme(
62 url.scheme())) {
59 DCHECK(MessageLoop::current()) << 63 DCHECK(MessageLoop::current()) <<
60 "The current MessageLoop must exist"; 64 "The current MessageLoop must exist";
61 DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) << 65 DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
62 "The current MessageLoop must be TYPE_IO"; 66 "The current MessageLoop must be TYPE_IO";
63 DCHECK(delegate_); 67 DCHECK(delegate_);
68 DCHECK(throttle_);
64 } 69 }
65 70
66 SocketStream::~SocketStream() { 71 SocketStream::~SocketStream() {
67 DCHECK(!delegate_); 72 DCHECK(!delegate_);
68 } 73 }
69 74
70 SocketStream::UserData* SocketStream::GetUserData( 75 SocketStream::UserData* SocketStream::GetUserData(
71 const void* key) const { 76 const void* key) const {
72 UserDataMap::const_iterator found = user_data_.find(key); 77 UserDataMap::const_iterator found = user_data_.find(key);
73 if (found != user_data_.end()) 78 if (found != user_data_.end())
(...skipping 118 matching lines...) Expand 10 before | Expand all | Expand 10 after
192 DCHECK_EQ(next_state_, STATE_NONE); 197 DCHECK_EQ(next_state_, STATE_NONE);
193 DLOG(INFO) << "Finish"; 198 DLOG(INFO) << "Finish";
194 if (delegate_) 199 if (delegate_)
195 delegate_->OnError(this, result); 200 delegate_->OnError(this, result);
196 201
197 Delegate* delegate = delegate_; 202 Delegate* delegate = delegate_;
198 delegate_ = NULL; 203 delegate_ = NULL;
199 if (delegate) { 204 if (delegate) {
200 delegate->OnClose(this); 205 delegate->OnClose(this);
201 } 206 }
207 throttle_->OnClose(this);
202 Release(); 208 Release();
203 } 209 }
204 210
205 void SocketStream::SetHostResolver(HostResolver* host_resolver) { 211 void SocketStream::SetHostResolver(HostResolver* host_resolver) {
206 DCHECK(host_resolver); 212 DCHECK(host_resolver);
207 host_resolver_ = host_resolver; 213 host_resolver_ = host_resolver;
208 } 214 }
209 215
210 void SocketStream::SetClientSocketFactory( 216 void SocketStream::SetClientSocketFactory(
211 ClientSocketFactory* factory) { 217 ClientSocketFactory* factory) {
212 DCHECK(factory); 218 DCHECK(factory);
213 factory_ = factory; 219 factory_ = factory;
214 } 220 }
215 221
222 void SocketStream::CopyAddrInfo(struct addrinfo* head) {
223 addresses_.Copy(head);
224 }
225
216 int SocketStream::DidEstablishConnection() { 226 int SocketStream::DidEstablishConnection() {
217 if (!socket_.get() || !socket_->IsConnected()) { 227 if (!socket_.get() || !socket_->IsConnected()) {
218 next_state_ = STATE_CLOSE; 228 next_state_ = STATE_CLOSE;
219 return ERR_CONNECTION_FAILED; 229 return ERR_CONNECTION_FAILED;
220 } 230 }
221 next_state_ = STATE_READ_WRITE; 231 next_state_ = STATE_READ_WRITE;
222 232
223 if (delegate_) 233 if (delegate_)
224 delegate_->OnConnected(this, max_pending_send_allowed_); 234 delegate_->OnConnected(this, max_pending_send_allowed_);
225 235
226 return OK; 236 return OK;
227 } 237 }
228 238
229 void SocketStream::DidReceiveData(int result) { 239 int SocketStream::DidReceiveData(int result) {
230 DCHECK(read_buf_); 240 DCHECK(read_buf_);
231 DCHECK_GT(result, 0); 241 DCHECK_GT(result, 0);
232 if (!delegate_) 242 int len = result;
233 return; 243 result = throttle_->OnRead(this, read_buf_->data(), len, &io_callback_);
234 // Notify recevied data to delegate. 244 if (delegate_) {
235 delegate_->OnReceivedData(this, read_buf_->data(), result); 245 // Notify recevied data to delegate.
246 delegate_->OnReceivedData(this, read_buf_->data(), len);
247 }
236 read_buf_ = NULL; 248 read_buf_ = NULL;
249 return result;
237 } 250 }
238 251
239 void SocketStream::DidSendData(int result) { 252 int SocketStream::DidSendData(int result) {
253 DCHECK_GT(result, 0);
254 int len = result;
255 result = throttle_->OnWrite(this, current_write_buf_->data(), len,
256 &io_callback_);
240 current_write_buf_ = NULL; 257 current_write_buf_ = NULL;
241 DCHECK_GT(result, 0); 258 if (delegate_)
242 if (!delegate_) 259 delegate_->OnSentData(this, len);
243 return;
244 260
245 delegate_->OnSentData(this, result); 261 int remaining_size = write_buf_size_ - write_buf_offset_ - len;
246 int remaining_size = write_buf_size_ - write_buf_offset_ - result;
247 if (remaining_size == 0) { 262 if (remaining_size == 0) {
248 if (!pending_write_bufs_.empty()) { 263 if (!pending_write_bufs_.empty()) {
249 write_buf_size_ = pending_write_bufs_.front()->size(); 264 write_buf_size_ = pending_write_bufs_.front()->size();
250 write_buf_ = pending_write_bufs_.front(); 265 write_buf_ = pending_write_bufs_.front();
251 pending_write_bufs_.pop_front(); 266 pending_write_bufs_.pop_front();
252 } else { 267 } else {
253 write_buf_size_ = 0; 268 write_buf_size_ = 0;
254 write_buf_ = NULL; 269 write_buf_ = NULL;
255 } 270 }
256 write_buf_offset_ = 0; 271 write_buf_offset_ = 0;
257 } else { 272 } else {
258 write_buf_offset_ += result; 273 write_buf_offset_ += len;
259 } 274 }
275 return result;
260 } 276 }
261 277
262 void SocketStream::OnIOCompleted(int result) { 278 void SocketStream::OnIOCompleted(int result) {
263 DoLoop(result); 279 DoLoop(result);
264 } 280 }
265 281
266 void SocketStream::OnReadCompleted(int result) { 282 void SocketStream::OnReadCompleted(int result) {
267 if (result == 0) { 283 if (result == 0) {
268 // 0 indicates end-of-file, so socket was closed. 284 // 0 indicates end-of-file, so socket was closed.
269 next_state_ = STATE_CLOSE; 285 next_state_ = STATE_CLOSE;
270 } else if (result > 0 && read_buf_) { 286 } else if (result > 0 && read_buf_) {
271 DidReceiveData(result); 287 result = DidReceiveData(result);
272 result = OK;
273 } 288 }
274 DoLoop(result); 289 DoLoop(result);
275 } 290 }
276 291
277 void SocketStream::OnWriteCompleted(int result) { 292 void SocketStream::OnWriteCompleted(int result) {
278 if (result >= 0 && write_buf_) { 293 if (result >= 0 && write_buf_) {
279 DidSendData(result); 294 result = DidSendData(result);
280 result = OK;
281 } 295 }
282 DoLoop(result); 296 DoLoop(result);
283 } 297 }
284 298
285 void SocketStream::DoLoop(int result) { 299 void SocketStream::DoLoop(int result) {
286 if (next_state_ == STATE_NONE) 300 if (next_state_ == STATE_NONE)
287 return; 301 return;
288 302
289 do { 303 do {
290 State state = next_state_; 304 State state = next_state_;
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
400 port = url_.EffectiveIntPort(); 414 port = url_.EffectiveIntPort();
401 } 415 }
402 416
403 HostResolver::RequestInfo resolve_info(host, port); 417 HostResolver::RequestInfo resolve_info(host, port);
404 418
405 resolver_.reset(new SingleRequestHostResolver(host_resolver_.get())); 419 resolver_.reset(new SingleRequestHostResolver(host_resolver_.get()));
406 return resolver_->Resolve(resolve_info, &addresses_, &io_callback_, NULL); 420 return resolver_->Resolve(resolve_info, &addresses_, &io_callback_, NULL);
407 } 421 }
408 422
409 int SocketStream::DoResolveHostComplete(int result) { 423 int SocketStream::DoResolveHostComplete(int result) {
410 if (result == OK) 424 if (result == OK) {
411 next_state_ = STATE_TCP_CONNECT; 425 next_state_ = STATE_TCP_CONNECT;
412 else 426 result = throttle_->OnStartOpenConnection(this, &io_callback_);
427 } else {
413 next_state_ = STATE_CLOSE; 428 next_state_ = STATE_CLOSE;
429 }
414 // TODO(ukai): if error occured, reconsider proxy after error. 430 // TODO(ukai): if error occured, reconsider proxy after error.
415 return result; 431 return result;
416 } 432 }
417 433
418 int SocketStream::DoTcpConnect() { 434 int SocketStream::DoTcpConnect() {
419 next_state_ = STATE_TCP_CONNECT_COMPLETE; 435 next_state_ = STATE_TCP_CONNECT_COMPLETE;
420 DCHECK(factory_); 436 DCHECK(factory_);
421 socket_.reset(factory_->CreateTCPClientSocket(addresses_)); 437 socket_.reset(factory_->CreateTCPClientSocket(addresses_));
422 // TODO(willchan): Plumb LoadLog into SocketStream. 438 // TODO(willchan): Plumb LoadLog into SocketStream.
423 return socket_->Connect(&io_callback_, NULL); 439 return socket_->Connect(&io_callback_, NULL);
(...skipping 249 matching lines...) Expand 10 before | Expand all | Expand 10 after
673 return ERR_CONNECTION_CLOSED; 689 return ERR_CONNECTION_CLOSED;
674 } 690 }
675 691
676 next_state_ = STATE_READ_WRITE; 692 next_state_ = STATE_READ_WRITE;
677 693
678 if (!read_buf_) { 694 if (!read_buf_) {
679 // No read pending. 695 // No read pending.
680 read_buf_ = new IOBuffer(kReadBufferSize); 696 read_buf_ = new IOBuffer(kReadBufferSize);
681 result = socket_->Read(read_buf_, kReadBufferSize, &read_callback_); 697 result = socket_->Read(read_buf_, kReadBufferSize, &read_callback_);
682 if (result > 0) { 698 if (result > 0) {
683 DidReceiveData(result); 699 return DidReceiveData(result);
684 return OK;
685 } else if (result == 0) { 700 } else if (result == 0) {
686 // 0 indicates end-of-file, so socket was closed. 701 // 0 indicates end-of-file, so socket was closed.
687 next_state_ = STATE_CLOSE; 702 next_state_ = STATE_CLOSE;
688 return ERR_CONNECTION_CLOSED; 703 return ERR_CONNECTION_CLOSED;
689 } 704 }
690 // If read is pending, try write as well. 705 // If read is pending, try write as well.
691 // Otherwise, return the result and do next loop (to close the connection). 706 // Otherwise, return the result and do next loop (to close the connection).
692 if (result != ERR_IO_PENDING) { 707 if (result != ERR_IO_PENDING) {
693 next_state_ = STATE_CLOSE; 708 next_state_ = STATE_CLOSE;
694 return result; 709 return result;
695 } 710 }
696 } 711 }
697 // Read is pending. 712 // Read is pending.
698 DCHECK(read_buf_); 713 DCHECK(read_buf_);
699 714
700 if (write_buf_ && !current_write_buf_) { 715 if (write_buf_ && !current_write_buf_) {
701 // No write pending. 716 // No write pending.
702 current_write_buf_ = new DrainableIOBuffer(write_buf_, write_buf_size_); 717 current_write_buf_ = new DrainableIOBuffer(write_buf_, write_buf_size_);
703 current_write_buf_->SetOffset(write_buf_offset_); 718 current_write_buf_->SetOffset(write_buf_offset_);
704 result = socket_->Write(current_write_buf_, 719 result = socket_->Write(current_write_buf_,
705 current_write_buf_->BytesRemaining(), 720 current_write_buf_->BytesRemaining(),
706 &write_callback_); 721 &write_callback_);
707 if (result > 0) { 722 if (result > 0) {
708 DidSendData(result); 723 return DidSendData(result);
709 return OK;
710 } 724 }
711 // If write is not pending, return the result and do next loop (to close 725 // If write is not pending, return the result and do next loop (to close
712 // the connection). 726 // the connection).
713 if (result != 0 && result != ERR_IO_PENDING) { 727 if (result != 0 && result != ERR_IO_PENDING) {
714 next_state_ = STATE_CLOSE; 728 next_state_ = STATE_CLOSE;
715 return result; 729 return result;
716 } 730 }
717 return result; 731 return result;
718 } 732 }
719 733
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
815 829
816 SSLConfigService* SocketStream::ssl_config_service() const { 830 SSLConfigService* SocketStream::ssl_config_service() const {
817 return context_->ssl_config_service(); 831 return context_->ssl_config_service();
818 } 832 }
819 833
820 ProxyService* SocketStream::proxy_service() const { 834 ProxyService* SocketStream::proxy_service() const {
821 return context_->proxy_service(); 835 return context_->proxy_service();
822 } 836 }
823 837
824 } // namespace net 838 } // namespace net
OLDNEW
« no previous file with comments | « net/socket_stream/socket_stream.h ('k') | net/socket_stream/socket_stream_throttle.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698