OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/renderer/p2p/ipc_socket_factory.h" | 5 #include "content/renderer/p2p/ipc_socket_factory.h" |
6 | 6 |
7 #include <deque> | |
8 | |
7 #include "base/compiler_specific.h" | 9 #include "base/compiler_specific.h" |
8 #include "base/debug/trace_event.h" | 10 #include "base/debug/trace_event.h" |
9 #include "base/message_loop.h" | 11 #include "base/message_loop.h" |
10 #include "base/message_loop_proxy.h" | 12 #include "base/message_loop_proxy.h" |
11 #include "content/renderer/p2p/socket_client.h" | 13 #include "content/renderer/p2p/socket_client.h" |
12 #include "content/renderer/p2p/socket_dispatcher.h" | 14 #include "content/renderer/p2p/socket_dispatcher.h" |
13 #include "jingle/glue/utils.h" | 15 #include "jingle/glue/utils.h" |
14 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" | 16 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" |
15 | 17 |
16 namespace content { | 18 namespace content { |
17 | 19 |
18 namespace { | 20 namespace { |
19 | 21 |
20 // TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. | 22 // TODO(miu): This probably needs tuning. http://crbug.com/237960 |
21 const int kMaxPendingPackets = 32; | 23 const size_t kMaximumInFlightBytes = 256 << 10; // 256 KB |
Alpha Left Google
2013/05/04 07:14:14
This limit should be small. M27 limit is 40KB. We
miu
2013/05/06 19:21:26
I agree with you. One problem, though: When sendi
| |
22 const int kWritableSignalThreshold = 0; | |
23 | 24 |
24 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface | 25 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface |
25 // using P2PSocketClient that works over IPC-channel. It must be used | 26 // using P2PSocketClient that works over IPC-channel. It must be used |
26 // on the thread it was created. | 27 // on the thread it was created. |
27 class IpcPacketSocket : public talk_base::AsyncPacketSocket, | 28 class IpcPacketSocket : public talk_base::AsyncPacketSocket, |
28 public P2PSocketClient::Delegate { | 29 public P2PSocketClient::Delegate { |
29 public: | 30 public: |
30 IpcPacketSocket(); | 31 IpcPacketSocket(); |
31 virtual ~IpcPacketSocket(); | 32 virtual ~IpcPacketSocket(); |
32 | 33 |
(...skipping 26 matching lines...) Expand all Loading... | |
59 | 60 |
60 private: | 61 private: |
61 enum InternalState { | 62 enum InternalState { |
62 IS_UNINITIALIZED, | 63 IS_UNINITIALIZED, |
63 IS_OPENING, | 64 IS_OPENING, |
64 IS_OPEN, | 65 IS_OPEN, |
65 IS_CLOSED, | 66 IS_CLOSED, |
66 IS_ERROR, | 67 IS_ERROR, |
67 }; | 68 }; |
68 | 69 |
70 // Reset send throttling mechanism to initial (i.e., no packets in-flight) | |
71 // state. | |
72 void ResetSendThrottling(); | |
73 | |
74 // Update trace of send throttling internal state. This should be called | |
75 // immediately after any changes to |token_bucket_level_| and/or | |
76 // |in_flight_packet_sizes_|. | |
77 void TraceSendThrottlingState() const; | |
78 | |
69 void InitAcceptedTcp(P2PSocketClient* client, | 79 void InitAcceptedTcp(P2PSocketClient* client, |
70 const talk_base::SocketAddress& local_address, | 80 const talk_base::SocketAddress& local_address, |
71 const talk_base::SocketAddress& remote_address); | 81 const talk_base::SocketAddress& remote_address); |
72 | 82 |
73 P2PSocketType type_; | 83 P2PSocketType type_; |
74 | 84 |
75 // Message loop on which this socket was created and being used. | 85 // Message loop on which this socket was created and being used. |
76 MessageLoop* message_loop_; | 86 MessageLoop* message_loop_; |
77 | 87 |
78 // Corresponding P2P socket client. | 88 // Corresponding P2P socket client. |
79 scoped_refptr<P2PSocketClient> client_; | 89 scoped_refptr<P2PSocketClient> client_; |
80 | 90 |
81 // Local address is allocated by the browser process, and the | 91 // Local address is allocated by the browser process, and the |
82 // renderer side doesn't know the address until it receives OnOpen() | 92 // renderer side doesn't know the address until it receives OnOpen() |
83 // event from the browser. | 93 // event from the browser. |
84 talk_base::SocketAddress local_address_; | 94 talk_base::SocketAddress local_address_; |
85 | 95 |
86 // Remote address for client TCP connections. | 96 // Remote address for client TCP connections. |
87 talk_base::SocketAddress remote_address_; | 97 talk_base::SocketAddress remote_address_; |
88 | 98 |
89 // Current state of the object. | 99 // Current state of the object. |
90 InternalState state_; | 100 InternalState state_; |
91 | 101 |
92 // Number which have been sent to the browser, but for which we haven't | 102 // A token bucket of bytes is used to throttle the sending of packets to the |
93 // received response. | 103 // browser process. As calls to OnSendComplete() return, the bucket is |
94 int send_packets_pending_; | 104 // refilled. This allows short bursts of high-rate sending without dropping |
105 // packets, but quickly restricts the client to a sustainable steady-state | |
106 // rate. | |
107 size_t token_bucket_level_; | |
108 std::deque<size_t> in_flight_packet_sizes_; | |
95 | 109 |
96 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the | 110 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the |
97 // caller expects SignalWritable notification. | 111 // caller expects SignalWritable notification. |
98 bool writable_signal_expected_; | 112 bool writable_signal_expected_; |
99 | 113 |
100 // Current error code. Valid when state_ == IS_ERROR. | 114 // Current error code. Valid when state_ == IS_ERROR. |
101 int error_; | 115 int error_; |
102 | 116 |
103 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); | 117 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); |
104 }; | 118 }; |
105 | 119 |
106 IpcPacketSocket::IpcPacketSocket() | 120 IpcPacketSocket::IpcPacketSocket() |
107 : type_(P2P_SOCKET_UDP), | 121 : type_(P2P_SOCKET_UDP), |
108 message_loop_(MessageLoop::current()), | 122 message_loop_(MessageLoop::current()), |
109 state_(IS_UNINITIALIZED), | 123 state_(IS_UNINITIALIZED), |
110 send_packets_pending_(0), | |
111 writable_signal_expected_(false), | |
112 error_(0) { | 124 error_(0) { |
113 } | 125 } |
114 | 126 |
115 IpcPacketSocket::~IpcPacketSocket() { | 127 IpcPacketSocket::~IpcPacketSocket() { |
116 if (state_ == IS_OPENING || state_ == IS_OPEN || | 128 if (state_ == IS_OPENING || state_ == IS_OPEN || |
117 state_ == IS_ERROR) { | 129 state_ == IS_ERROR) { |
118 Close(); | 130 Close(); |
119 } | 131 } |
120 } | 132 } |
121 | 133 |
134 void IpcPacketSocket::ResetSendThrottling() { | |
135 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate); | |
136 token_bucket_level_ = kMaximumInFlightBytes; | |
137 in_flight_packet_sizes_.clear(); | |
138 TraceSendThrottlingState(); | |
139 writable_signal_expected_ = false; | |
140 } | |
141 | |
142 void IpcPacketSocket::TraceSendThrottlingState() const { | |
143 TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", token_bucket_level_); | |
144 TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight", | |
145 in_flight_packet_sizes_.size()); | |
146 } | |
147 | |
122 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, | 148 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, |
123 const talk_base::SocketAddress& local_address, | 149 const talk_base::SocketAddress& local_address, |
124 const talk_base::SocketAddress& remote_address) { | 150 const talk_base::SocketAddress& remote_address) { |
125 DCHECK_EQ(MessageLoop::current(), message_loop_); | 151 DCHECK_EQ(MessageLoop::current(), message_loop_); |
126 DCHECK_EQ(state_, IS_UNINITIALIZED); | 152 DCHECK_EQ(state_, IS_UNINITIALIZED); |
127 | 153 |
128 type_ = type; | 154 type_ = type; |
129 client_ = client; | 155 client_ = client; |
130 local_address_ = local_address; | 156 local_address_ = local_address; |
131 remote_address_ = remote_address; | 157 remote_address_ = remote_address; |
(...skipping 19 matching lines...) Expand all Loading... | |
151 P2PSocketClient* client, | 177 P2PSocketClient* client, |
152 const talk_base::SocketAddress& local_address, | 178 const talk_base::SocketAddress& local_address, |
153 const talk_base::SocketAddress& remote_address) { | 179 const talk_base::SocketAddress& remote_address) { |
154 DCHECK_EQ(MessageLoop::current(), message_loop_); | 180 DCHECK_EQ(MessageLoop::current(), message_loop_); |
155 DCHECK_EQ(state_, IS_UNINITIALIZED); | 181 DCHECK_EQ(state_, IS_UNINITIALIZED); |
156 | 182 |
157 client_ = client; | 183 client_ = client; |
158 local_address_ = local_address; | 184 local_address_ = local_address; |
159 remote_address_ = remote_address; | 185 remote_address_ = remote_address; |
160 state_ = IS_OPEN; | 186 state_ = IS_OPEN; |
187 ResetSendThrottling(); | |
161 client_->set_delegate(this); | 188 client_->set_delegate(this); |
162 } | 189 } |
163 | 190 |
164 // talk_base::AsyncPacketSocket interface. | 191 // talk_base::AsyncPacketSocket interface. |
165 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { | 192 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { |
166 DCHECK_EQ(MessageLoop::current(), message_loop_); | 193 DCHECK_EQ(MessageLoop::current(), message_loop_); |
167 return local_address_; | 194 return local_address_; |
168 } | 195 } |
169 | 196 |
170 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { | 197 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { |
(...skipping 18 matching lines...) Expand all Loading... | |
189 return EWOULDBLOCK; | 216 return EWOULDBLOCK; |
190 case IS_CLOSED: | 217 case IS_CLOSED: |
191 return ENOTCONN; | 218 return ENOTCONN; |
192 case IS_ERROR: | 219 case IS_ERROR: |
193 return error_; | 220 return error_; |
194 case IS_OPEN: | 221 case IS_OPEN: |
195 // Continue sending the packet. | 222 // Continue sending the packet. |
196 break; | 223 break; |
197 } | 224 } |
198 | 225 |
199 if (send_packets_pending_ > kMaxPendingPackets) { | 226 if (data_size == 0) |
200 TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", | 227 return 0; // No-op. |
228 | |
229 if (data_size > token_bucket_level_) { | |
230 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", | |
201 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); | 231 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); |
202 writable_signal_expected_ = true; | 232 writable_signal_expected_ = true; |
203 error_ = EWOULDBLOCK; | 233 error_ = EWOULDBLOCK; |
204 return -1; | 234 return -1; |
205 } | 235 } |
206 | 236 |
207 const char* data_char = reinterpret_cast<const char*>(data); | |
208 std::vector<char> data_vector(data_char, data_char + data_size); | |
209 | |
210 net::IPEndPoint address_chrome; | 237 net::IPEndPoint address_chrome; |
211 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { | 238 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { |
212 NOTREACHED(); | 239 NOTREACHED(); |
213 return -1; | 240 return -1; |
214 } | 241 } |
215 | 242 |
216 ++send_packets_pending_; | 243 token_bucket_level_ -= data_size; |
Alpha Left Google
2013/05/04 04:11:20
I'm not sure how this helps.
data_size is always
Alpha Left Google
2013/05/04 07:14:14
An alternative is to do this decrement with a dela
miu
2013/05/06 19:21:26
Exactly. That's what I thought we had discussed o
miu
2013/05/06 19:21:26
The two are identical approaches, except that the
| |
244 in_flight_packet_sizes_.push_back(data_size); | |
245 TraceSendThrottlingState(); | |
246 | |
247 const char* data_char = reinterpret_cast<const char*>(data); | |
248 std::vector<char> data_vector(data_char, data_char + data_size); | |
217 client_->Send(address_chrome, data_vector); | 249 client_->Send(address_chrome, data_vector); |
218 | 250 |
219 // Fake successful send. The caller ignores result anyway. | 251 // Fake successful send. The caller ignores result anyway. |
220 return data_size; | 252 return data_size; |
221 } | 253 } |
222 | 254 |
223 int IpcPacketSocket::Close() { | 255 int IpcPacketSocket::Close() { |
224 DCHECK_EQ(MessageLoop::current(), message_loop_); | 256 DCHECK_EQ(MessageLoop::current(), message_loop_); |
225 | 257 |
226 client_->Close(); | 258 client_->Close(); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
280 DCHECK_EQ(MessageLoop::current(), message_loop_); | 312 DCHECK_EQ(MessageLoop::current(), message_loop_); |
281 | 313 |
282 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { | 314 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { |
283 // Always expect correct IPv4 address to be allocated. | 315 // Always expect correct IPv4 address to be allocated. |
284 NOTREACHED(); | 316 NOTREACHED(); |
285 OnError(); | 317 OnError(); |
286 return; | 318 return; |
287 } | 319 } |
288 | 320 |
289 state_ = IS_OPEN; | 321 state_ = IS_OPEN; |
322 ResetSendThrottling(); | |
290 | 323 |
291 SignalAddressReady(this, local_address_); | 324 SignalAddressReady(this, local_address_); |
292 if (type_ == P2P_SOCKET_TCP_CLIENT) | 325 if (type_ == P2P_SOCKET_TCP_CLIENT) |
293 SignalConnect(this); | 326 SignalConnect(this); |
294 } | 327 } |
295 | 328 |
296 void IpcPacketSocket::OnIncomingTcpConnection( | 329 void IpcPacketSocket::OnIncomingTcpConnection( |
297 const net::IPEndPoint& address, | 330 const net::IPEndPoint& address, |
298 P2PSocketClient* client) { | 331 P2PSocketClient* client) { |
299 DCHECK_EQ(MessageLoop::current(), message_loop_); | 332 DCHECK_EQ(MessageLoop::current(), message_loop_); |
300 | 333 |
301 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); | 334 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); |
302 | 335 |
303 talk_base::SocketAddress remote_address; | 336 talk_base::SocketAddress remote_address; |
304 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { | 337 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { |
305 // Always expect correct IPv4 address to be allocated. | 338 // Always expect correct IPv4 address to be allocated. |
306 NOTREACHED(); | 339 NOTREACHED(); |
307 } | 340 } |
308 socket->InitAcceptedTcp(client, local_address_, remote_address); | 341 socket->InitAcceptedTcp(client, local_address_, remote_address); |
309 SignalNewConnection(this, socket.release()); | 342 SignalNewConnection(this, socket.release()); |
310 } | 343 } |
311 | 344 |
312 void IpcPacketSocket::OnSendComplete() { | 345 void IpcPacketSocket::OnSendComplete() { |
313 DCHECK_EQ(MessageLoop::current(), message_loop_); | 346 DCHECK_EQ(MessageLoop::current(), message_loop_); |
314 | 347 |
315 --send_packets_pending_; | 348 if (in_flight_packet_sizes_.empty()) { |
316 DCHECK_GE(send_packets_pending_, 0); | 349 NOTREACHED() << "Received SendComplete() with no known in-flight packets."; |
350 // In production builds, auto-recover by resetting the throttling state. | |
Alpha Left Google
2013/05/04 04:11:20
How is it related to production builds?
miu
2013/05/06 19:21:26
Wrong wording on my part. I meant: s/production/r
| |
351 const bool signal_expected = writable_signal_expected_; | |
352 ResetSendThrottling(); | |
353 if (signal_expected) | |
354 SignalReadyToSend(this); | |
355 return; | |
356 } | |
357 token_bucket_level_ += in_flight_packet_sizes_.front(); | |
358 DCHECK_LE(token_bucket_level_, kMaximumInFlightBytes); | |
359 in_flight_packet_sizes_.pop_front(); | |
360 TraceSendThrottlingState(); | |
317 | 361 |
318 if (writable_signal_expected_ && | 362 if (writable_signal_expected_ && token_bucket_level_ > 0) { |
319 send_packets_pending_ <= kWritableSignalThreshold) { | |
320 SignalReadyToSend(this); | 363 SignalReadyToSend(this); |
321 writable_signal_expected_ = false; | 364 writable_signal_expected_ = false; |
322 } | 365 } |
323 } | 366 } |
324 | 367 |
325 void IpcPacketSocket::OnError() { | 368 void IpcPacketSocket::OnError() { |
326 DCHECK_EQ(MessageLoop::current(), message_loop_); | 369 DCHECK_EQ(MessageLoop::current(), message_loop_); |
327 state_ = IS_ERROR; | 370 state_ = IS_ERROR; |
328 error_ = ECONNABORTED; | 371 error_ = ECONNABORTED; |
329 } | 372 } |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
396 talk_base::SocketAddress crome_address; | 439 talk_base::SocketAddress crome_address; |
397 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); | 440 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); |
398 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); | 441 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); |
399 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, | 442 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, |
400 remote_address)) | 443 remote_address)) |
401 return NULL; | 444 return NULL; |
402 return socket.release(); | 445 return socket.release(); |
403 } | 446 } |
404 | 447 |
405 } // namespace content | 448 } // namespace content |
OLD | NEW |