Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(142)

Side by Side Diff: content/renderer/p2p/ipc_socket_factory.cc

Issue 14559005: Replace send packet throttling with a scheme based on number of in-flight bytes in IpcPacketSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: kMaximumInFlightBytes set to 64KB, per discussions and testing observations. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/p2p/ipc_socket_factory.h" 5 #include "content/renderer/p2p/ipc_socket_factory.h"
6 6
7 #include <deque>
8
7 #include "base/compiler_specific.h" 9 #include "base/compiler_specific.h"
8 #include "base/debug/trace_event.h" 10 #include "base/debug/trace_event.h"
9 #include "base/message_loop.h" 11 #include "base/message_loop.h"
10 #include "base/message_loop_proxy.h" 12 #include "base/message_loop_proxy.h"
11 #include "content/renderer/p2p/socket_client.h" 13 #include "content/renderer/p2p/socket_client.h"
12 #include "content/renderer/p2p/socket_dispatcher.h" 14 #include "content/renderer/p2p/socket_dispatcher.h"
13 #include "jingle/glue/utils.h" 15 #include "jingle/glue/utils.h"
14 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" 16 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
15 17
16 namespace content { 18 namespace content {
17 19
18 namespace { 20 namespace {
19 21
20 // TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. 22 // TODO(miu): This needs tuning. http://crbug.com/237960
21 const int kMaxPendingPackets = 32; 23 const size_t kMaximumInFlightBytes = 64 * 1024; // 64 KB
22 const int kWritableSignalThreshold = 0;
23 24
24 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface 25 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface
25 // using P2PSocketClient that works over IPC-channel. It must be used 26 // using P2PSocketClient that works over IPC-channel. It must be used
26 // on the thread it was created. 27 // on the thread it was created.
27 class IpcPacketSocket : public talk_base::AsyncPacketSocket, 28 class IpcPacketSocket : public talk_base::AsyncPacketSocket,
28 public P2PSocketClient::Delegate { 29 public P2PSocketClient::Delegate {
29 public: 30 public:
30 IpcPacketSocket(); 31 IpcPacketSocket();
31 virtual ~IpcPacketSocket(); 32 virtual ~IpcPacketSocket();
32 33
(...skipping 26 matching lines...) Expand all
59 60
60 private: 61 private:
61 enum InternalState { 62 enum InternalState {
62 IS_UNINITIALIZED, 63 IS_UNINITIALIZED,
63 IS_OPENING, 64 IS_OPENING,
64 IS_OPEN, 65 IS_OPEN,
65 IS_CLOSED, 66 IS_CLOSED,
66 IS_ERROR, 67 IS_ERROR,
67 }; 68 };
68 69
70 // Update trace of send throttling internal state. This should be called
71 // immediately after any changes to |send_bytes_available_| and/or
72 // |in_flight_packet_sizes_|.
73 void TraceSendThrottlingState() const;
74
69 void InitAcceptedTcp(P2PSocketClient* client, 75 void InitAcceptedTcp(P2PSocketClient* client,
70 const talk_base::SocketAddress& local_address, 76 const talk_base::SocketAddress& local_address,
71 const talk_base::SocketAddress& remote_address); 77 const talk_base::SocketAddress& remote_address);
72 78
73 P2PSocketType type_; 79 P2PSocketType type_;
74 80
75 // Message loop on which this socket was created and being used. 81 // Message loop on which this socket was created and being used.
76 base::MessageLoop* message_loop_; 82 base::MessageLoop* message_loop_;
77 83
78 // Corresponding P2P socket client. 84 // Corresponding P2P socket client.
79 scoped_refptr<P2PSocketClient> client_; 85 scoped_refptr<P2PSocketClient> client_;
80 86
81 // Local address is allocated by the browser process, and the 87 // Local address is allocated by the browser process, and the
82 // renderer side doesn't know the address until it receives OnOpen() 88 // renderer side doesn't know the address until it receives OnOpen()
83 // event from the browser. 89 // event from the browser.
84 talk_base::SocketAddress local_address_; 90 talk_base::SocketAddress local_address_;
85 91
86 // Remote address for client TCP connections. 92 // Remote address for client TCP connections.
87 talk_base::SocketAddress remote_address_; 93 talk_base::SocketAddress remote_address_;
88 94
89 // Current state of the object. 95 // Current state of the object.
90 InternalState state_; 96 InternalState state_;
91 97
92 // Number which have been sent to the browser, but for which we haven't 98 // Track the number of bytes allowed to be sent non-blocking. This is used to
93 // received response. 99 // throttle the sending of packets to the browser process. For each packet
94 int send_packets_pending_; 100 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
101 // from the browser process) are made, the value is increased back. This
102 // allows short bursts of high-rate sending without dropping packets, but
103 // quickly restricts the client to a sustainable steady-state rate.
104 size_t send_bytes_available_;
105 std::deque<size_t> in_flight_packet_sizes_;
95 106
96 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the 107 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
97 // caller expects SignalWritable notification. 108 // caller expects SignalWritable notification.
98 bool writable_signal_expected_; 109 bool writable_signal_expected_;
99 110
100 // Current error code. Valid when state_ == IS_ERROR. 111 // Current error code. Valid when state_ == IS_ERROR.
101 int error_; 112 int error_;
102 113
103 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); 114 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
104 }; 115 };
105 116
106 IpcPacketSocket::IpcPacketSocket() 117 IpcPacketSocket::IpcPacketSocket()
107 : type_(P2P_SOCKET_UDP), 118 : type_(P2P_SOCKET_UDP),
108 message_loop_(base::MessageLoop::current()), 119 message_loop_(base::MessageLoop::current()),
109 state_(IS_UNINITIALIZED), 120 state_(IS_UNINITIALIZED),
110 send_packets_pending_(0), 121 send_bytes_available_(kMaximumInFlightBytes),
111 writable_signal_expected_(false), 122 writable_signal_expected_(false),
112 error_(0) {} 123 error_(0) {
124 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
125 }
113 126
114 IpcPacketSocket::~IpcPacketSocket() { 127 IpcPacketSocket::~IpcPacketSocket() {
115 if (state_ == IS_OPENING || state_ == IS_OPEN || 128 if (state_ == IS_OPENING || state_ == IS_OPEN ||
116 state_ == IS_ERROR) { 129 state_ == IS_ERROR) {
117 Close(); 130 Close();
118 } 131 }
119 } 132 }
120 133
134 void IpcPacketSocket::TraceSendThrottlingState() const {
135 TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", send_bytes_available_);
136 TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight",
137 in_flight_packet_sizes_.size());
138 }
139
121 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, 140 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client,
122 const talk_base::SocketAddress& local_address, 141 const talk_base::SocketAddress& local_address,
123 const talk_base::SocketAddress& remote_address) { 142 const talk_base::SocketAddress& remote_address) {
124 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 143 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
125 DCHECK_EQ(state_, IS_UNINITIALIZED); 144 DCHECK_EQ(state_, IS_UNINITIALIZED);
126 145
127 type_ = type; 146 type_ = type;
128 client_ = client; 147 client_ = client;
129 local_address_ = local_address; 148 local_address_ = local_address;
130 remote_address_ = remote_address; 149 remote_address_ = remote_address;
(...skipping 19 matching lines...) Expand all
150 P2PSocketClient* client, 169 P2PSocketClient* client,
151 const talk_base::SocketAddress& local_address, 170 const talk_base::SocketAddress& local_address,
152 const talk_base::SocketAddress& remote_address) { 171 const talk_base::SocketAddress& remote_address) {
153 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 172 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
154 DCHECK_EQ(state_, IS_UNINITIALIZED); 173 DCHECK_EQ(state_, IS_UNINITIALIZED);
155 174
156 client_ = client; 175 client_ = client;
157 local_address_ = local_address; 176 local_address_ = local_address;
158 remote_address_ = remote_address; 177 remote_address_ = remote_address;
159 state_ = IS_OPEN; 178 state_ = IS_OPEN;
179 TraceSendThrottlingState();
160 client_->set_delegate(this); 180 client_->set_delegate(this);
161 } 181 }
162 182
163 // talk_base::AsyncPacketSocket interface. 183 // talk_base::AsyncPacketSocket interface.
164 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { 184 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
165 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 185 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
166 return local_address_; 186 return local_address_;
167 } 187 }
168 188
169 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { 189 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
(...skipping 18 matching lines...) Expand all
188 return EWOULDBLOCK; 208 return EWOULDBLOCK;
189 case IS_CLOSED: 209 case IS_CLOSED:
190 return ENOTCONN; 210 return ENOTCONN;
191 case IS_ERROR: 211 case IS_ERROR:
192 return error_; 212 return error_;
193 case IS_OPEN: 213 case IS_OPEN:
194 // Continue sending the packet. 214 // Continue sending the packet.
195 break; 215 break;
196 } 216 }
197 217
198 if (send_packets_pending_ > kMaxPendingPackets) { 218 if (data_size == 0) {
199 TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", 219 NOTREACHED();
220 return 0;
221 }
222
223 if (data_size > send_bytes_available_) {
224 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
200 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); 225 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id());
201 writable_signal_expected_ = true; 226 writable_signal_expected_ = true;
202 error_ = EWOULDBLOCK; 227 error_ = EWOULDBLOCK;
203 return -1; 228 return -1;
204 } 229 }
205 230
206 const char* data_char = reinterpret_cast<const char*>(data);
207 std::vector<char> data_vector(data_char, data_char + data_size);
208
209 net::IPEndPoint address_chrome; 231 net::IPEndPoint address_chrome;
210 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { 232 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
211 NOTREACHED(); 233 NOTREACHED();
212 return -1; 234 return -1;
213 } 235 }
214 236
215 ++send_packets_pending_; 237 send_bytes_available_ -= data_size;
238 in_flight_packet_sizes_.push_back(data_size);
239 TraceSendThrottlingState();
240
241 const char* data_char = reinterpret_cast<const char*>(data);
242 std::vector<char> data_vector(data_char, data_char + data_size);
216 client_->Send(address_chrome, data_vector); 243 client_->Send(address_chrome, data_vector);
217 244
218 // Fake successful send. The caller ignores result anyway. 245 // Fake successful send. The caller ignores result anyway.
219 return data_size; 246 return data_size;
220 } 247 }
221 248
222 int IpcPacketSocket::Close() { 249 int IpcPacketSocket::Close() {
223 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 250 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
224 251
225 client_->Close(); 252 client_->Close();
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 306 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
280 307
281 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { 308 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
282 // Always expect correct IPv4 address to be allocated. 309 // Always expect correct IPv4 address to be allocated.
283 NOTREACHED(); 310 NOTREACHED();
284 OnError(); 311 OnError();
285 return; 312 return;
286 } 313 }
287 314
288 state_ = IS_OPEN; 315 state_ = IS_OPEN;
316 TraceSendThrottlingState();
289 317
290 SignalAddressReady(this, local_address_); 318 SignalAddressReady(this, local_address_);
291 if (type_ == P2P_SOCKET_TCP_CLIENT) 319 if (type_ == P2P_SOCKET_TCP_CLIENT)
292 SignalConnect(this); 320 SignalConnect(this);
293 } 321 }
294 322
295 void IpcPacketSocket::OnIncomingTcpConnection( 323 void IpcPacketSocket::OnIncomingTcpConnection(
296 const net::IPEndPoint& address, 324 const net::IPEndPoint& address,
297 P2PSocketClient* client) { 325 P2PSocketClient* client) {
298 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 326 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
299 327
300 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 328 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
301 329
302 talk_base::SocketAddress remote_address; 330 talk_base::SocketAddress remote_address;
303 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { 331 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
304 // Always expect correct IPv4 address to be allocated. 332 // Always expect correct IPv4 address to be allocated.
305 NOTREACHED(); 333 NOTREACHED();
306 } 334 }
307 socket->InitAcceptedTcp(client, local_address_, remote_address); 335 socket->InitAcceptedTcp(client, local_address_, remote_address);
308 SignalNewConnection(this, socket.release()); 336 SignalNewConnection(this, socket.release());
309 } 337 }
310 338
311 void IpcPacketSocket::OnSendComplete() { 339 void IpcPacketSocket::OnSendComplete() {
312 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 340 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
313 341
314 --send_packets_pending_; 342 CHECK(!in_flight_packet_sizes_.empty());
315 DCHECK_GE(send_packets_pending_, 0); 343 send_bytes_available_ += in_flight_packet_sizes_.front();
344 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
345 in_flight_packet_sizes_.pop_front();
346 TraceSendThrottlingState();
316 347
317 if (writable_signal_expected_ && 348 if (writable_signal_expected_ && send_bytes_available_ > 0) {
318 send_packets_pending_ <= kWritableSignalThreshold) {
319 SignalReadyToSend(this); 349 SignalReadyToSend(this);
320 writable_signal_expected_ = false; 350 writable_signal_expected_ = false;
321 } 351 }
322 } 352 }
323 353
324 void IpcPacketSocket::OnError() { 354 void IpcPacketSocket::OnError() {
325 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 355 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
326 state_ = IS_ERROR; 356 state_ = IS_ERROR;
327 error_ = ECONNABORTED; 357 error_ = ECONNABORTED;
328 } 358 }
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
395 talk_base::SocketAddress crome_address; 425 talk_base::SocketAddress crome_address;
396 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); 426 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
397 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 427 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
398 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, 428 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address,
399 remote_address)) 429 remote_address))
400 return NULL; 430 return NULL;
401 return socket.release(); 431 return socket.release();
402 } 432 }
403 433
404 } // namespace content 434 } // namespace content
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698