OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/renderer/p2p/ipc_socket_factory.h" | 5 #include "content/renderer/p2p/ipc_socket_factory.h" |
6 | 6 |
| 7 #include <deque> |
| 8 |
7 #include "base/compiler_specific.h" | 9 #include "base/compiler_specific.h" |
8 #include "base/debug/trace_event.h" | 10 #include "base/debug/trace_event.h" |
9 #include "base/message_loop.h" | 11 #include "base/message_loop.h" |
10 #include "base/message_loop_proxy.h" | 12 #include "base/message_loop_proxy.h" |
11 #include "content/renderer/p2p/socket_client.h" | 13 #include "content/renderer/p2p/socket_client.h" |
12 #include "content/renderer/p2p/socket_dispatcher.h" | 14 #include "content/renderer/p2p/socket_dispatcher.h" |
13 #include "jingle/glue/utils.h" | 15 #include "jingle/glue/utils.h" |
14 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" | 16 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" |
15 | 17 |
16 namespace content { | 18 namespace content { |
17 | 19 |
18 namespace { | 20 namespace { |
19 | 21 |
20 // TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. | 22 // TODO(miu): This needs tuning. http://crbug.com/237960 |
21 const int kMaxPendingPackets = 32; | 23 const size_t kMaximumInFlightBytes = 64 * 1024; // 64 KB |
22 const int kWritableSignalThreshold = 0; | |
23 | 24 |
24 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface | 25 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface |
25 // using P2PSocketClient that works over IPC-channel. It must be used | 26 // using P2PSocketClient that works over IPC-channel. It must be used |
26 // on the thread it was created. | 27 // on the thread it was created. |
27 class IpcPacketSocket : public talk_base::AsyncPacketSocket, | 28 class IpcPacketSocket : public talk_base::AsyncPacketSocket, |
28 public P2PSocketClient::Delegate { | 29 public P2PSocketClient::Delegate { |
29 public: | 30 public: |
30 IpcPacketSocket(); | 31 IpcPacketSocket(); |
31 virtual ~IpcPacketSocket(); | 32 virtual ~IpcPacketSocket(); |
32 | 33 |
(...skipping 26 matching lines...) Expand all Loading... |
59 | 60 |
60 private: | 61 private: |
61 enum InternalState { | 62 enum InternalState { |
62 IS_UNINITIALIZED, | 63 IS_UNINITIALIZED, |
63 IS_OPENING, | 64 IS_OPENING, |
64 IS_OPEN, | 65 IS_OPEN, |
65 IS_CLOSED, | 66 IS_CLOSED, |
66 IS_ERROR, | 67 IS_ERROR, |
67 }; | 68 }; |
68 | 69 |
| 70 // Update trace of send throttling internal state. This should be called |
| 71 // immediately after any changes to |send_bytes_available_| and/or |
| 72 // |in_flight_packet_sizes_|. |
| 73 void TraceSendThrottlingState() const; |
| 74 |
69 void InitAcceptedTcp(P2PSocketClient* client, | 75 void InitAcceptedTcp(P2PSocketClient* client, |
70 const talk_base::SocketAddress& local_address, | 76 const talk_base::SocketAddress& local_address, |
71 const talk_base::SocketAddress& remote_address); | 77 const talk_base::SocketAddress& remote_address); |
72 | 78 |
73 P2PSocketType type_; | 79 P2PSocketType type_; |
74 | 80 |
75 // Message loop on which this socket was created and being used. | 81 // Message loop on which this socket was created and being used. |
76 base::MessageLoop* message_loop_; | 82 base::MessageLoop* message_loop_; |
77 | 83 |
78 // Corresponding P2P socket client. | 84 // Corresponding P2P socket client. |
79 scoped_refptr<P2PSocketClient> client_; | 85 scoped_refptr<P2PSocketClient> client_; |
80 | 86 |
81 // Local address is allocated by the browser process, and the | 87 // Local address is allocated by the browser process, and the |
82 // renderer side doesn't know the address until it receives OnOpen() | 88 // renderer side doesn't know the address until it receives OnOpen() |
83 // event from the browser. | 89 // event from the browser. |
84 talk_base::SocketAddress local_address_; | 90 talk_base::SocketAddress local_address_; |
85 | 91 |
86 // Remote address for client TCP connections. | 92 // Remote address for client TCP connections. |
87 talk_base::SocketAddress remote_address_; | 93 talk_base::SocketAddress remote_address_; |
88 | 94 |
89 // Current state of the object. | 95 // Current state of the object. |
90 InternalState state_; | 96 InternalState state_; |
91 | 97 |
92 // Number which have been sent to the browser, but for which we haven't | 98 // Track the number of bytes allowed to be sent non-blocking. This is used to |
93 // received response. | 99 // throttle the sending of packets to the browser process. For each packet |
94 int send_packets_pending_; | 100 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs |
| 101 // from the browser process) are made, the value is increased back. This |
| 102 // allows short bursts of high-rate sending without dropping packets, but |
| 103 // quickly restricts the client to a sustainable steady-state rate. |
| 104 size_t send_bytes_available_; |
| 105 std::deque<size_t> in_flight_packet_sizes_; |
95 | 106 |
96 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the | 107 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the |
97 // caller expects SignalWritable notification. | 108 // caller expects SignalWritable notification. |
98 bool writable_signal_expected_; | 109 bool writable_signal_expected_; |
99 | 110 |
100 // Current error code. Valid when state_ == IS_ERROR. | 111 // Current error code. Valid when state_ == IS_ERROR. |
101 int error_; | 112 int error_; |
102 | 113 |
103 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); | 114 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); |
104 }; | 115 }; |
105 | 116 |
106 IpcPacketSocket::IpcPacketSocket() | 117 IpcPacketSocket::IpcPacketSocket() |
107 : type_(P2P_SOCKET_UDP), | 118 : type_(P2P_SOCKET_UDP), |
108 message_loop_(base::MessageLoop::current()), | 119 message_loop_(base::MessageLoop::current()), |
109 state_(IS_UNINITIALIZED), | 120 state_(IS_UNINITIALIZED), |
110 send_packets_pending_(0), | 121 send_bytes_available_(kMaximumInFlightBytes), |
111 writable_signal_expected_(false), | 122 writable_signal_expected_(false), |
112 error_(0) {} | 123 error_(0) { |
| 124 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate); |
| 125 } |
113 | 126 |
114 IpcPacketSocket::~IpcPacketSocket() { | 127 IpcPacketSocket::~IpcPacketSocket() { |
115 if (state_ == IS_OPENING || state_ == IS_OPEN || | 128 if (state_ == IS_OPENING || state_ == IS_OPEN || |
116 state_ == IS_ERROR) { | 129 state_ == IS_ERROR) { |
117 Close(); | 130 Close(); |
118 } | 131 } |
119 } | 132 } |
120 | 133 |
| 134 void IpcPacketSocket::TraceSendThrottlingState() const { |
| 135 TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", send_bytes_available_); |
| 136 TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight", |
| 137 in_flight_packet_sizes_.size()); |
| 138 } |
| 139 |
121 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, | 140 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, |
122 const talk_base::SocketAddress& local_address, | 141 const talk_base::SocketAddress& local_address, |
123 const talk_base::SocketAddress& remote_address) { | 142 const talk_base::SocketAddress& remote_address) { |
124 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 143 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
125 DCHECK_EQ(state_, IS_UNINITIALIZED); | 144 DCHECK_EQ(state_, IS_UNINITIALIZED); |
126 | 145 |
127 type_ = type; | 146 type_ = type; |
128 client_ = client; | 147 client_ = client; |
129 local_address_ = local_address; | 148 local_address_ = local_address; |
130 remote_address_ = remote_address; | 149 remote_address_ = remote_address; |
(...skipping 19 matching lines...) Expand all Loading... |
150 P2PSocketClient* client, | 169 P2PSocketClient* client, |
151 const talk_base::SocketAddress& local_address, | 170 const talk_base::SocketAddress& local_address, |
152 const talk_base::SocketAddress& remote_address) { | 171 const talk_base::SocketAddress& remote_address) { |
153 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 172 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
154 DCHECK_EQ(state_, IS_UNINITIALIZED); | 173 DCHECK_EQ(state_, IS_UNINITIALIZED); |
155 | 174 |
156 client_ = client; | 175 client_ = client; |
157 local_address_ = local_address; | 176 local_address_ = local_address; |
158 remote_address_ = remote_address; | 177 remote_address_ = remote_address; |
159 state_ = IS_OPEN; | 178 state_ = IS_OPEN; |
| 179 TraceSendThrottlingState(); |
160 client_->set_delegate(this); | 180 client_->set_delegate(this); |
161 } | 181 } |
162 | 182 |
163 // talk_base::AsyncPacketSocket interface. | 183 // talk_base::AsyncPacketSocket interface. |
164 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { | 184 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { |
165 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 185 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
166 return local_address_; | 186 return local_address_; |
167 } | 187 } |
168 | 188 |
169 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { | 189 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { |
(...skipping 18 matching lines...) Expand all Loading... |
188 return EWOULDBLOCK; | 208 return EWOULDBLOCK; |
189 case IS_CLOSED: | 209 case IS_CLOSED: |
190 return ENOTCONN; | 210 return ENOTCONN; |
191 case IS_ERROR: | 211 case IS_ERROR: |
192 return error_; | 212 return error_; |
193 case IS_OPEN: | 213 case IS_OPEN: |
194 // Continue sending the packet. | 214 // Continue sending the packet. |
195 break; | 215 break; |
196 } | 216 } |
197 | 217 |
198 if (send_packets_pending_ > kMaxPendingPackets) { | 218 if (data_size == 0) { |
199 TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", | 219 NOTREACHED(); |
| 220 return 0; |
| 221 } |
| 222 |
| 223 if (data_size > send_bytes_available_) { |
| 224 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", |
200 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); | 225 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); |
201 writable_signal_expected_ = true; | 226 writable_signal_expected_ = true; |
202 error_ = EWOULDBLOCK; | 227 error_ = EWOULDBLOCK; |
203 return -1; | 228 return -1; |
204 } | 229 } |
205 | 230 |
206 const char* data_char = reinterpret_cast<const char*>(data); | |
207 std::vector<char> data_vector(data_char, data_char + data_size); | |
208 | |
209 net::IPEndPoint address_chrome; | 231 net::IPEndPoint address_chrome; |
210 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { | 232 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { |
211 NOTREACHED(); | 233 NOTREACHED(); |
212 return -1; | 234 return -1; |
213 } | 235 } |
214 | 236 |
215 ++send_packets_pending_; | 237 send_bytes_available_ -= data_size; |
| 238 in_flight_packet_sizes_.push_back(data_size); |
| 239 TraceSendThrottlingState(); |
| 240 |
| 241 const char* data_char = reinterpret_cast<const char*>(data); |
| 242 std::vector<char> data_vector(data_char, data_char + data_size); |
216 client_->Send(address_chrome, data_vector); | 243 client_->Send(address_chrome, data_vector); |
217 | 244 |
218 // Fake successful send. The caller ignores result anyway. | 245 // Fake successful send. The caller ignores result anyway. |
219 return data_size; | 246 return data_size; |
220 } | 247 } |
221 | 248 |
222 int IpcPacketSocket::Close() { | 249 int IpcPacketSocket::Close() { |
223 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 250 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
224 | 251 |
225 client_->Close(); | 252 client_->Close(); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
279 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 306 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
280 | 307 |
281 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { | 308 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { |
282 // Always expect correct IPv4 address to be allocated. | 309 // Always expect correct IPv4 address to be allocated. |
283 NOTREACHED(); | 310 NOTREACHED(); |
284 OnError(); | 311 OnError(); |
285 return; | 312 return; |
286 } | 313 } |
287 | 314 |
288 state_ = IS_OPEN; | 315 state_ = IS_OPEN; |
| 316 TraceSendThrottlingState(); |
289 | 317 |
290 SignalAddressReady(this, local_address_); | 318 SignalAddressReady(this, local_address_); |
291 if (type_ == P2P_SOCKET_TCP_CLIENT) | 319 if (type_ == P2P_SOCKET_TCP_CLIENT) |
292 SignalConnect(this); | 320 SignalConnect(this); |
293 } | 321 } |
294 | 322 |
295 void IpcPacketSocket::OnIncomingTcpConnection( | 323 void IpcPacketSocket::OnIncomingTcpConnection( |
296 const net::IPEndPoint& address, | 324 const net::IPEndPoint& address, |
297 P2PSocketClient* client) { | 325 P2PSocketClient* client) { |
298 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 326 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
299 | 327 |
300 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); | 328 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); |
301 | 329 |
302 talk_base::SocketAddress remote_address; | 330 talk_base::SocketAddress remote_address; |
303 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { | 331 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { |
304 // Always expect correct IPv4 address to be allocated. | 332 // Always expect correct IPv4 address to be allocated. |
305 NOTREACHED(); | 333 NOTREACHED(); |
306 } | 334 } |
307 socket->InitAcceptedTcp(client, local_address_, remote_address); | 335 socket->InitAcceptedTcp(client, local_address_, remote_address); |
308 SignalNewConnection(this, socket.release()); | 336 SignalNewConnection(this, socket.release()); |
309 } | 337 } |
310 | 338 |
311 void IpcPacketSocket::OnSendComplete() { | 339 void IpcPacketSocket::OnSendComplete() { |
312 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 340 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
313 | 341 |
314 --send_packets_pending_; | 342 CHECK(!in_flight_packet_sizes_.empty()); |
315 DCHECK_GE(send_packets_pending_, 0); | 343 send_bytes_available_ += in_flight_packet_sizes_.front(); |
| 344 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes); |
| 345 in_flight_packet_sizes_.pop_front(); |
| 346 TraceSendThrottlingState(); |
316 | 347 |
317 if (writable_signal_expected_ && | 348 if (writable_signal_expected_ && send_bytes_available_ > 0) { |
318 send_packets_pending_ <= kWritableSignalThreshold) { | |
319 SignalReadyToSend(this); | 349 SignalReadyToSend(this); |
320 writable_signal_expected_ = false; | 350 writable_signal_expected_ = false; |
321 } | 351 } |
322 } | 352 } |
323 | 353 |
324 void IpcPacketSocket::OnError() { | 354 void IpcPacketSocket::OnError() { |
325 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 355 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
326 state_ = IS_ERROR; | 356 state_ = IS_ERROR; |
327 error_ = ECONNABORTED; | 357 error_ = ECONNABORTED; |
328 } | 358 } |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
395 talk_base::SocketAddress crome_address; | 425 talk_base::SocketAddress crome_address; |
396 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); | 426 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); |
397 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); | 427 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); |
398 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, | 428 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, |
399 remote_address)) | 429 remote_address)) |
400 return NULL; | 430 return NULL; |
401 return socket.release(); | 431 return socket.release(); |
402 } | 432 } |
403 | 433 |
404 } // namespace content | 434 } // namespace content |
OLD | NEW |