Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(180)

Side by Side Diff: content/renderer/p2p/ipc_socket_factory.cc

Issue 14559005: Replace send packet throttling with a scheme based on number of in-flight bytes in IpcPacketSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Addressed sergeyu's comments. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/p2p/ipc_socket_factory.h" 5 #include "content/renderer/p2p/ipc_socket_factory.h"
6 6
7 #include <deque>
8
7 #include "base/compiler_specific.h" 9 #include "base/compiler_specific.h"
8 #include "base/debug/trace_event.h" 10 #include "base/debug/trace_event.h"
9 #include "base/message_loop.h" 11 #include "base/message_loop.h"
10 #include "base/message_loop_proxy.h" 12 #include "base/message_loop_proxy.h"
11 #include "content/renderer/p2p/socket_client.h" 13 #include "content/renderer/p2p/socket_client.h"
12 #include "content/renderer/p2p/socket_dispatcher.h" 14 #include "content/renderer/p2p/socket_dispatcher.h"
13 #include "jingle/glue/utils.h" 15 #include "jingle/glue/utils.h"
14 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" 16 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
15 17
16 namespace content { 18 namespace content {
17 19
18 namespace { 20 namespace {
19 21
20 // TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. 22 // TODO(miu): This probably needs tuning. http://crbug.com/237960
21 const int kMaxPendingPackets = 32; 23 const size_t kMaximumInFlightBytes = 256 * 1024; // 256 KB
22 const int kWritableSignalThreshold = 0;
23 24
24 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface 25 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface
25 // using P2PSocketClient that works over IPC-channel. It must be used 26 // using P2PSocketClient that works over IPC-channel. It must be used
26 // on the thread it was created. 27 // on the thread it was created.
27 class IpcPacketSocket : public talk_base::AsyncPacketSocket, 28 class IpcPacketSocket : public talk_base::AsyncPacketSocket,
28 public P2PSocketClient::Delegate { 29 public P2PSocketClient::Delegate {
29 public: 30 public:
30 IpcPacketSocket(); 31 IpcPacketSocket();
31 virtual ~IpcPacketSocket(); 32 virtual ~IpcPacketSocket();
32 33
(...skipping 26 matching lines...) Expand all
59 60
60 private: 61 private:
61 enum InternalState { 62 enum InternalState {
62 IS_UNINITIALIZED, 63 IS_UNINITIALIZED,
63 IS_OPENING, 64 IS_OPENING,
64 IS_OPEN, 65 IS_OPEN,
65 IS_CLOSED, 66 IS_CLOSED,
66 IS_ERROR, 67 IS_ERROR,
67 }; 68 };
68 69
70 // Reset send throttling mechanism to initial (i.e., no packets in-flight)
71 // state.
72 void ResetSendThrottling();
73
74 // Update trace of send throttling internal state. This should be called
75 // immediately after any changes to |send_bytes_available_| and/or
76 // |in_flight_packet_sizes_|.
77 void TraceSendThrottlingState() const;
78
69 void InitAcceptedTcp(P2PSocketClient* client, 79 void InitAcceptedTcp(P2PSocketClient* client,
70 const talk_base::SocketAddress& local_address, 80 const talk_base::SocketAddress& local_address,
71 const talk_base::SocketAddress& remote_address); 81 const talk_base::SocketAddress& remote_address);
72 82
73 P2PSocketType type_; 83 P2PSocketType type_;
74 84
75 // Message loop on which this socket was created and being used. 85 // Message loop on which this socket was created and being used.
76 base::MessageLoop* message_loop_; 86 base::MessageLoop* message_loop_;
77 87
78 // Corresponding P2P socket client. 88 // Corresponding P2P socket client.
79 scoped_refptr<P2PSocketClient> client_; 89 scoped_refptr<P2PSocketClient> client_;
80 90
81 // Local address is allocated by the browser process, and the 91 // Local address is allocated by the browser process, and the
82 // renderer side doesn't know the address until it receives OnOpen() 92 // renderer side doesn't know the address until it receives OnOpen()
83 // event from the browser. 93 // event from the browser.
84 talk_base::SocketAddress local_address_; 94 talk_base::SocketAddress local_address_;
85 95
86 // Remote address for client TCP connections. 96 // Remote address for client TCP connections.
87 talk_base::SocketAddress remote_address_; 97 talk_base::SocketAddress remote_address_;
88 98
89 // Current state of the object. 99 // Current state of the object.
90 InternalState state_; 100 InternalState state_;
91 101
92 // Number which have been sent to the browser, but for which we haven't 102 // Track the number of bytes allowed to be sent non-blocking. This is used to
93 // received response. 103 // throttle the sending of packets to the browser process. For each packet
94 int send_packets_pending_; 104 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
105 // from the browser process) are made, the value is increased back. This
106 // allows short bursts of high-rate sending without dropping packets, but
107 // quickly restricts the client to a sustainable steady-state rate.
108 size_t send_bytes_available_;
109 std::deque<size_t> in_flight_packet_sizes_;
95 110
96 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the 111 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
97 // caller expects SignalWritable notification. 112 // caller expects SignalWritable notification.
98 bool writable_signal_expected_; 113 bool writable_signal_expected_;
99 114
100 // Current error code. Valid when state_ == IS_ERROR. 115 // Current error code. Valid when state_ == IS_ERROR.
101 int error_; 116 int error_;
102 117
103 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); 118 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
104 }; 119 };
105 120
106 IpcPacketSocket::IpcPacketSocket() 121 IpcPacketSocket::IpcPacketSocket()
107 : type_(P2P_SOCKET_UDP), 122 : type_(P2P_SOCKET_UDP),
108 message_loop_(base::MessageLoop::current()), 123 message_loop_(base::MessageLoop::current()),
109 state_(IS_UNINITIALIZED), 124 state_(IS_UNINITIALIZED),
110 send_packets_pending_(0), 125 error_(0) {
111 writable_signal_expected_(false), 126 }
112 error_(0) {}
113 127
114 IpcPacketSocket::~IpcPacketSocket() { 128 IpcPacketSocket::~IpcPacketSocket() {
115 if (state_ == IS_OPENING || state_ == IS_OPEN || 129 if (state_ == IS_OPENING || state_ == IS_OPEN ||
116 state_ == IS_ERROR) { 130 state_ == IS_ERROR) {
117 Close(); 131 Close();
118 } 132 }
119 } 133 }
120 134
135 void IpcPacketSocket::ResetSendThrottling() {
Sergey Ulanov 2013/05/07 00:14:29 Now this is called only when the socket is being i
miu 2013/05/07 00:49:40 Done.
136 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
137 send_bytes_available_ = kMaximumInFlightBytes;
138 in_flight_packet_sizes_.clear();
139 TraceSendThrottlingState();
140 writable_signal_expected_ = false;
141 }
142
143 void IpcPacketSocket::TraceSendThrottlingState() const {
144 TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", send_bytes_available_);
145 TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight",
146 in_flight_packet_sizes_.size());
147 }
148
121 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, 149 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client,
122 const talk_base::SocketAddress& local_address, 150 const talk_base::SocketAddress& local_address,
123 const talk_base::SocketAddress& remote_address) { 151 const talk_base::SocketAddress& remote_address) {
124 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 152 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
125 DCHECK_EQ(state_, IS_UNINITIALIZED); 153 DCHECK_EQ(state_, IS_UNINITIALIZED);
126 154
127 type_ = type; 155 type_ = type;
128 client_ = client; 156 client_ = client;
129 local_address_ = local_address; 157 local_address_ = local_address;
130 remote_address_ = remote_address; 158 remote_address_ = remote_address;
(...skipping 19 matching lines...) Expand all
150 P2PSocketClient* client, 178 P2PSocketClient* client,
151 const talk_base::SocketAddress& local_address, 179 const talk_base::SocketAddress& local_address,
152 const talk_base::SocketAddress& remote_address) { 180 const talk_base::SocketAddress& remote_address) {
153 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 181 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
154 DCHECK_EQ(state_, IS_UNINITIALIZED); 182 DCHECK_EQ(state_, IS_UNINITIALIZED);
155 183
156 client_ = client; 184 client_ = client;
157 local_address_ = local_address; 185 local_address_ = local_address;
158 remote_address_ = remote_address; 186 remote_address_ = remote_address;
159 state_ = IS_OPEN; 187 state_ = IS_OPEN;
188 ResetSendThrottling();
160 client_->set_delegate(this); 189 client_->set_delegate(this);
161 } 190 }
162 191
163 // talk_base::AsyncPacketSocket interface. 192 // talk_base::AsyncPacketSocket interface.
164 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { 193 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
165 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 194 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
166 return local_address_; 195 return local_address_;
167 } 196 }
168 197
169 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { 198 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
(...skipping 18 matching lines...) Expand all
188 return EWOULDBLOCK; 217 return EWOULDBLOCK;
189 case IS_CLOSED: 218 case IS_CLOSED:
190 return ENOTCONN; 219 return ENOTCONN;
191 case IS_ERROR: 220 case IS_ERROR:
192 return error_; 221 return error_;
193 case IS_OPEN: 222 case IS_OPEN:
194 // Continue sending the packet. 223 // Continue sending the packet.
195 break; 224 break;
196 } 225 }
197 226
198 if (send_packets_pending_ > kMaxPendingPackets) { 227 if (data_size == 0) {
199 TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", 228 NOTREACHED();
229 return 0;
230 }
231
232 if (data_size > send_bytes_available_) {
233 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
200 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); 234 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id());
201 writable_signal_expected_ = true; 235 writable_signal_expected_ = true;
202 error_ = EWOULDBLOCK; 236 error_ = EWOULDBLOCK;
203 return -1; 237 return -1;
204 } 238 }
205 239
206 const char* data_char = reinterpret_cast<const char*>(data);
207 std::vector<char> data_vector(data_char, data_char + data_size);
208
209 net::IPEndPoint address_chrome; 240 net::IPEndPoint address_chrome;
210 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { 241 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
211 NOTREACHED(); 242 NOTREACHED();
212 return -1; 243 return -1;
213 } 244 }
214 245
215 ++send_packets_pending_; 246 send_bytes_available_ -= data_size;
247 in_flight_packet_sizes_.push_back(data_size);
248 TraceSendThrottlingState();
249
250 const char* data_char = reinterpret_cast<const char*>(data);
251 std::vector<char> data_vector(data_char, data_char + data_size);
216 client_->Send(address_chrome, data_vector); 252 client_->Send(address_chrome, data_vector);
217 253
218 // Fake successful send. The caller ignores result anyway. 254 // Fake successful send. The caller ignores result anyway.
219 return data_size; 255 return data_size;
220 } 256 }
221 257
222 int IpcPacketSocket::Close() { 258 int IpcPacketSocket::Close() {
223 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 259 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
224 260
225 client_->Close(); 261 client_->Close();
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 315 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
280 316
281 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { 317 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
282 // Always expect correct IPv4 address to be allocated. 318 // Always expect correct IPv4 address to be allocated.
283 NOTREACHED(); 319 NOTREACHED();
284 OnError(); 320 OnError();
285 return; 321 return;
286 } 322 }
287 323
288 state_ = IS_OPEN; 324 state_ = IS_OPEN;
325 ResetSendThrottling();
289 326
290 SignalAddressReady(this, local_address_); 327 SignalAddressReady(this, local_address_);
291 if (type_ == P2P_SOCKET_TCP_CLIENT) 328 if (type_ == P2P_SOCKET_TCP_CLIENT)
292 SignalConnect(this); 329 SignalConnect(this);
293 } 330 }
294 331
295 void IpcPacketSocket::OnIncomingTcpConnection( 332 void IpcPacketSocket::OnIncomingTcpConnection(
296 const net::IPEndPoint& address, 333 const net::IPEndPoint& address,
297 P2PSocketClient* client) { 334 P2PSocketClient* client) {
298 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 335 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
299 336
300 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 337 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
301 338
302 talk_base::SocketAddress remote_address; 339 talk_base::SocketAddress remote_address;
303 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { 340 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
304 // Always expect correct IPv4 address to be allocated. 341 // Always expect correct IPv4 address to be allocated.
305 NOTREACHED(); 342 NOTREACHED();
306 } 343 }
307 socket->InitAcceptedTcp(client, local_address_, remote_address); 344 socket->InitAcceptedTcp(client, local_address_, remote_address);
308 SignalNewConnection(this, socket.release()); 345 SignalNewConnection(this, socket.release());
309 } 346 }
310 347
311 void IpcPacketSocket::OnSendComplete() { 348 void IpcPacketSocket::OnSendComplete() {
312 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 349 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
313 350
314 --send_packets_pending_; 351 CHECK(!in_flight_packet_sizes_.empty())
315 DCHECK_GE(send_packets_pending_, 0); 352 << "Received SendComplete() with no known in-flight packets.";
Sergey Ulanov 2013/05/07 00:14:29 nit: remove the message. It makes release binaries
miu 2013/05/07 00:49:40 Done.
353 send_bytes_available_ += in_flight_packet_sizes_.front();
354 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
355 in_flight_packet_sizes_.pop_front();
356 TraceSendThrottlingState();
316 357
317 if (writable_signal_expected_ && 358 if (writable_signal_expected_ && send_bytes_available_ > 0) {
318 send_packets_pending_ <= kWritableSignalThreshold) {
319 SignalReadyToSend(this); 359 SignalReadyToSend(this);
320 writable_signal_expected_ = false; 360 writable_signal_expected_ = false;
321 } 361 }
322 } 362 }
323 363
324 void IpcPacketSocket::OnError() { 364 void IpcPacketSocket::OnError() {
325 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 365 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
326 state_ = IS_ERROR; 366 state_ = IS_ERROR;
327 error_ = ECONNABORTED; 367 error_ = ECONNABORTED;
328 } 368 }
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
395 talk_base::SocketAddress crome_address; 435 talk_base::SocketAddress crome_address;
396 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); 436 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
397 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 437 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
398 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, 438 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address,
399 remote_address)) 439 remote_address))
400 return NULL; 440 return NULL;
401 return socket.release(); 441 return socket.release();
402 } 442 }
403 443
404 } // namespace content 444 } // namespace content
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698