Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(252)

Side by Side Diff: content/renderer/p2p/ipc_socket_factory.cc

Issue 14559005: Replace send packet throttling with a scheme based on number of in-flight bytes in IpcPacketSocket. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebase Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/p2p/ipc_socket_factory.h" 5 #include "content/renderer/p2p/ipc_socket_factory.h"
6 6
7 #include <deque>
8
7 #include "base/compiler_specific.h" 9 #include "base/compiler_specific.h"
8 #include "base/debug/trace_event.h" 10 #include "base/debug/trace_event.h"
9 #include "base/message_loop.h" 11 #include "base/message_loop.h"
10 #include "base/message_loop_proxy.h" 12 #include "base/message_loop_proxy.h"
11 #include "content/renderer/p2p/socket_client.h" 13 #include "content/renderer/p2p/socket_client.h"
12 #include "content/renderer/p2p/socket_dispatcher.h" 14 #include "content/renderer/p2p/socket_dispatcher.h"
13 #include "jingle/glue/utils.h" 15 #include "jingle/glue/utils.h"
14 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" 16 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
15 17
16 namespace content { 18 namespace content {
17 19
18 namespace { 20 namespace {
19 21
20 // TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. 22 // TODO(miu): This probably needs tuning. http://crbug.com/237960
21 const int kMaxPendingPackets = 32; 23 const size_t kMaximumInFlightBytes = 256 << 10; // 256 KB
Sergey Ulanov 2013/05/06 22:18:02 nit: *1024 is more readable than <<10.
miu 2013/05/06 23:36:13 Done.
22 const int kWritableSignalThreshold = 0;
23 24
24 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface 25 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface
25 // using P2PSocketClient that works over IPC-channel. It must be used 26 // using P2PSocketClient that works over IPC-channel. It must be used
26 // on the thread it was created. 27 // on the thread it was created.
27 class IpcPacketSocket : public talk_base::AsyncPacketSocket, 28 class IpcPacketSocket : public talk_base::AsyncPacketSocket,
28 public P2PSocketClient::Delegate { 29 public P2PSocketClient::Delegate {
29 public: 30 public:
30 IpcPacketSocket(); 31 IpcPacketSocket();
31 virtual ~IpcPacketSocket(); 32 virtual ~IpcPacketSocket();
32 33
(...skipping 26 matching lines...) Expand all
59 60
60 private: 61 private:
61 enum InternalState { 62 enum InternalState {
62 IS_UNINITIALIZED, 63 IS_UNINITIALIZED,
63 IS_OPENING, 64 IS_OPENING,
64 IS_OPEN, 65 IS_OPEN,
65 IS_CLOSED, 66 IS_CLOSED,
66 IS_ERROR, 67 IS_ERROR,
67 }; 68 };
68 69
70 // Reset send throttling mechanism to initial (i.e., no packets in-flight)
71 // state.
72 void ResetSendThrottling();
73
74 // Update trace of send throttling internal state. This should be called
75 // immediately after any changes to |token_bucket_level_| and/or
76 // |in_flight_packet_sizes_|.
77 void TraceSendThrottlingState() const;
78
69 void InitAcceptedTcp(P2PSocketClient* client, 79 void InitAcceptedTcp(P2PSocketClient* client,
70 const talk_base::SocketAddress& local_address, 80 const talk_base::SocketAddress& local_address,
71 const talk_base::SocketAddress& remote_address); 81 const talk_base::SocketAddress& remote_address);
72 82
73 P2PSocketType type_; 83 P2PSocketType type_;
74 84
75 // Message loop on which this socket was created and being used. 85 // Message loop on which this socket was created and being used.
76 base::MessageLoop* message_loop_; 86 base::MessageLoop* message_loop_;
77 87
78 // Corresponding P2P socket client. 88 // Corresponding P2P socket client.
79 scoped_refptr<P2PSocketClient> client_; 89 scoped_refptr<P2PSocketClient> client_;
80 90
81 // Local address is allocated by the browser process, and the 91 // Local address is allocated by the browser process, and the
82 // renderer side doesn't know the address until it receives OnOpen() 92 // renderer side doesn't know the address until it receives OnOpen()
83 // event from the browser. 93 // event from the browser.
84 talk_base::SocketAddress local_address_; 94 talk_base::SocketAddress local_address_;
85 95
86 // Remote address for client TCP connections. 96 // Remote address for client TCP connections.
87 talk_base::SocketAddress remote_address_; 97 talk_base::SocketAddress remote_address_;
88 98
89 // Current state of the object. 99 // Current state of the object.
90 InternalState state_; 100 InternalState state_;
91 101
92 // Number which have been sent to the browser, but for which we haven't 102 // A token bucket of bytes is used to throttle the sending of packets to the
Sergey Ulanov 2013/05/06 22:18:02 It doesn't look right to me that this is called "T
miu 2013/05/06 23:36:13 Ah, yes. This did start out as a token bucket, wh
93 // received response. 103 // browser process. As calls to OnSendComplete() return, the bucket is
94 int send_packets_pending_; 104 // refilled. This allows short bursts of high-rate sending without dropping
105 // packets, but quickly restricts the client to a sustainable steady-state
106 // rate.
107 size_t token_bucket_level_;
Sergey Ulanov 2013/05/06 22:18:02 maybe int instead of size_t, because it's not size
miu 2013/05/06 23:36:13 But it *is* a size: It's always equal to kMaximumI
108 std::deque<size_t> in_flight_packet_sizes_;
95 109
96 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the 110 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
97 // caller expects SignalWritable notification. 111 // caller expects SignalWritable notification.
98 bool writable_signal_expected_; 112 bool writable_signal_expected_;
99 113
100 // Current error code. Valid when state_ == IS_ERROR. 114 // Current error code. Valid when state_ == IS_ERROR.
101 int error_; 115 int error_;
102 116
103 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); 117 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
104 }; 118 };
105 119
106 IpcPacketSocket::IpcPacketSocket() 120 IpcPacketSocket::IpcPacketSocket()
107 : type_(P2P_SOCKET_UDP), 121 : type_(P2P_SOCKET_UDP),
108 message_loop_(base::MessageLoop::current()), 122 message_loop_(base::MessageLoop::current()),
109 state_(IS_UNINITIALIZED), 123 state_(IS_UNINITIALIZED),
110 send_packets_pending_(0), 124 error_(0) {
111 writable_signal_expected_(false), 125 }
112 error_(0) {}
113 126
114 IpcPacketSocket::~IpcPacketSocket() { 127 IpcPacketSocket::~IpcPacketSocket() {
115 if (state_ == IS_OPENING || state_ == IS_OPEN || 128 if (state_ == IS_OPENING || state_ == IS_OPEN ||
116 state_ == IS_ERROR) { 129 state_ == IS_ERROR) {
117 Close(); 130 Close();
118 } 131 }
119 } 132 }
120 133
134 void IpcPacketSocket::ResetSendThrottling() {
135 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
136 token_bucket_level_ = kMaximumInFlightBytes;
137 in_flight_packet_sizes_.clear();
138 TraceSendThrottlingState();
139 writable_signal_expected_ = false;
140 }
141
142 void IpcPacketSocket::TraceSendThrottlingState() const {
143 TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", token_bucket_level_);
144 TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight",
145 in_flight_packet_sizes_.size());
146 }
147
121 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, 148 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client,
122 const talk_base::SocketAddress& local_address, 149 const talk_base::SocketAddress& local_address,
123 const talk_base::SocketAddress& remote_address) { 150 const talk_base::SocketAddress& remote_address) {
124 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 151 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
125 DCHECK_EQ(state_, IS_UNINITIALIZED); 152 DCHECK_EQ(state_, IS_UNINITIALIZED);
126 153
127 type_ = type; 154 type_ = type;
128 client_ = client; 155 client_ = client;
129 local_address_ = local_address; 156 local_address_ = local_address;
130 remote_address_ = remote_address; 157 remote_address_ = remote_address;
(...skipping 19 matching lines...) Expand all
150 P2PSocketClient* client, 177 P2PSocketClient* client,
151 const talk_base::SocketAddress& local_address, 178 const talk_base::SocketAddress& local_address,
152 const talk_base::SocketAddress& remote_address) { 179 const talk_base::SocketAddress& remote_address) {
153 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 180 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
154 DCHECK_EQ(state_, IS_UNINITIALIZED); 181 DCHECK_EQ(state_, IS_UNINITIALIZED);
155 182
156 client_ = client; 183 client_ = client;
157 local_address_ = local_address; 184 local_address_ = local_address;
158 remote_address_ = remote_address; 185 remote_address_ = remote_address;
159 state_ = IS_OPEN; 186 state_ = IS_OPEN;
187 ResetSendThrottling();
160 client_->set_delegate(this); 188 client_->set_delegate(this);
161 } 189 }
162 190
163 // talk_base::AsyncPacketSocket interface. 191 // talk_base::AsyncPacketSocket interface.
164 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { 192 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
165 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 193 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
166 return local_address_; 194 return local_address_;
167 } 195 }
168 196
169 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { 197 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
(...skipping 18 matching lines...) Expand all
188 return EWOULDBLOCK; 216 return EWOULDBLOCK;
189 case IS_CLOSED: 217 case IS_CLOSED:
190 return ENOTCONN; 218 return ENOTCONN;
191 case IS_ERROR: 219 case IS_ERROR:
192 return error_; 220 return error_;
193 case IS_OPEN: 221 case IS_OPEN:
194 // Continue sending the packet. 222 // Continue sending the packet.
195 break; 223 break;
196 } 224 }
197 225
198 if (send_packets_pending_ > kMaxPendingPackets) { 226 if (data_size == 0)
Sergey Ulanov 2013/05/06 22:18:02 I think this can be DCHECK_GT(data_size, 0).
miu 2013/05/06 23:36:13 After thinking this over, I believe a NOTREACHED()
199 TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", 227 return 0; // No-op.
228
229 if (data_size > token_bucket_level_) {
230 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
200 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); 231 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id());
201 writable_signal_expected_ = true; 232 writable_signal_expected_ = true;
202 error_ = EWOULDBLOCK; 233 error_ = EWOULDBLOCK;
203 return -1; 234 return -1;
204 } 235 }
205 236
206 const char* data_char = reinterpret_cast<const char*>(data);
207 std::vector<char> data_vector(data_char, data_char + data_size);
208
209 net::IPEndPoint address_chrome; 237 net::IPEndPoint address_chrome;
210 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { 238 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
211 NOTREACHED(); 239 NOTREACHED();
212 return -1; 240 return -1;
213 } 241 }
214 242
215 ++send_packets_pending_; 243 token_bucket_level_ -= data_size;
244 in_flight_packet_sizes_.push_back(data_size);
245 TraceSendThrottlingState();
246
247 const char* data_char = reinterpret_cast<const char*>(data);
248 std::vector<char> data_vector(data_char, data_char + data_size);
216 client_->Send(address_chrome, data_vector); 249 client_->Send(address_chrome, data_vector);
217 250
218 // Fake successful send. The caller ignores result anyway. 251 // Fake successful send. The caller ignores result anyway.
219 return data_size; 252 return data_size;
220 } 253 }
221 254
222 int IpcPacketSocket::Close() { 255 int IpcPacketSocket::Close() {
223 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 256 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
224 257
225 client_->Close(); 258 client_->Close();
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 312 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
280 313
281 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { 314 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
282 // Always expect correct IPv4 address to be allocated. 315 // Always expect correct IPv4 address to be allocated.
283 NOTREACHED(); 316 NOTREACHED();
284 OnError(); 317 OnError();
285 return; 318 return;
286 } 319 }
287 320
288 state_ = IS_OPEN; 321 state_ = IS_OPEN;
322 ResetSendThrottling();
289 323
290 SignalAddressReady(this, local_address_); 324 SignalAddressReady(this, local_address_);
291 if (type_ == P2P_SOCKET_TCP_CLIENT) 325 if (type_ == P2P_SOCKET_TCP_CLIENT)
292 SignalConnect(this); 326 SignalConnect(this);
293 } 327 }
294 328
295 void IpcPacketSocket::OnIncomingTcpConnection( 329 void IpcPacketSocket::OnIncomingTcpConnection(
296 const net::IPEndPoint& address, 330 const net::IPEndPoint& address,
297 P2PSocketClient* client) { 331 P2PSocketClient* client) {
298 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 332 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
299 333
300 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 334 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
301 335
302 talk_base::SocketAddress remote_address; 336 talk_base::SocketAddress remote_address;
303 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { 337 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
304 // Always expect correct IPv4 address to be allocated. 338 // Always expect correct IPv4 address to be allocated.
305 NOTREACHED(); 339 NOTREACHED();
306 } 340 }
307 socket->InitAcceptedTcp(client, local_address_, remote_address); 341 socket->InitAcceptedTcp(client, local_address_, remote_address);
308 SignalNewConnection(this, socket.release()); 342 SignalNewConnection(this, socket.release());
309 } 343 }
310 344
311 void IpcPacketSocket::OnSendComplete() { 345 void IpcPacketSocket::OnSendComplete() {
312 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 346 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
313 347
314 --send_packets_pending_; 348 if (in_flight_packet_sizes_.empty()) {
315 DCHECK_GE(send_packets_pending_, 0); 349 NOTREACHED() << "Received SendComplete() with no known in-flight packets.";
350 // In Release builds, auto-recover by resetting the throttling state.
351 const bool signal_expected = writable_signal_expected_;
352 ResetSendThrottling();
Sergey Ulanov 2013/05/06 22:18:02 Why do you need this given it's marked as NOTREACH
miu 2013/05/06 23:36:13 Done. It's a CHECK() now.
353 if (signal_expected)
354 SignalReadyToSend(this);
355 return;
356 }
357 token_bucket_level_ += in_flight_packet_sizes_.front();
358 DCHECK_LE(token_bucket_level_, kMaximumInFlightBytes);
359 in_flight_packet_sizes_.pop_front();
360 TraceSendThrottlingState();
316 361
317 if (writable_signal_expected_ && 362 if (writable_signal_expected_ && token_bucket_level_ > 0) {
318 send_packets_pending_ <= kWritableSignalThreshold) {
319 SignalReadyToSend(this); 363 SignalReadyToSend(this);
320 writable_signal_expected_ = false; 364 writable_signal_expected_ = false;
321 } 365 }
322 } 366 }
323 367
324 void IpcPacketSocket::OnError() { 368 void IpcPacketSocket::OnError() {
325 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 369 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
326 state_ = IS_ERROR; 370 state_ = IS_ERROR;
327 error_ = ECONNABORTED; 371 error_ = ECONNABORTED;
328 } 372 }
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
395 talk_base::SocketAddress crome_address; 439 talk_base::SocketAddress crome_address;
396 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); 440 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
397 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 441 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
398 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, 442 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address,
399 remote_address)) 443 remote_address))
400 return NULL; 444 return NULL;
401 return socket.release(); 445 return socket.release();
402 } 446 }
403 447
404 } // namespace content 448 } // namespace content
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698