Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/renderer/p2p/ipc_socket_factory.h" | 5 #include "content/renderer/p2p/ipc_socket_factory.h" |
| 6 | 6 |
| 7 #include <deque> | |
| 8 | |
| 7 #include "base/compiler_specific.h" | 9 #include "base/compiler_specific.h" |
| 8 #include "base/debug/trace_event.h" | 10 #include "base/debug/trace_event.h" |
| 9 #include "base/message_loop.h" | 11 #include "base/message_loop.h" |
| 10 #include "base/message_loop_proxy.h" | 12 #include "base/message_loop_proxy.h" |
| 11 #include "content/renderer/p2p/socket_client.h" | 13 #include "content/renderer/p2p/socket_client.h" |
| 12 #include "content/renderer/p2p/socket_dispatcher.h" | 14 #include "content/renderer/p2p/socket_dispatcher.h" |
| 13 #include "jingle/glue/utils.h" | 15 #include "jingle/glue/utils.h" |
| 14 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" | 16 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" |
| 15 | 17 |
| 16 namespace content { | 18 namespace content { |
| 17 | 19 |
| 18 namespace { | 20 namespace { |
| 19 | 21 |
| 20 // TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. | 22 // TODO(miu): This probably needs tuning. http://crbug.com/237960 |
| 21 const int kMaxPendingPackets = 32; | 23 const size_t kMaximumInFlightBytes = 256 * 1024; // 256 KB |
| 22 const int kWritableSignalThreshold = 0; | |
| 23 | 24 |
| 24 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface | 25 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface |
| 25 // using P2PSocketClient that works over IPC-channel. It must be used | 26 // using P2PSocketClient that works over IPC-channel. It must be used |
| 26 // on the thread it was created. | 27 // on the thread it was created. |
| 27 class IpcPacketSocket : public talk_base::AsyncPacketSocket, | 28 class IpcPacketSocket : public talk_base::AsyncPacketSocket, |
| 28 public P2PSocketClient::Delegate { | 29 public P2PSocketClient::Delegate { |
| 29 public: | 30 public: |
| 30 IpcPacketSocket(); | 31 IpcPacketSocket(); |
| 31 virtual ~IpcPacketSocket(); | 32 virtual ~IpcPacketSocket(); |
| 32 | 33 |
| (...skipping 26 matching lines...) Expand all Loading... | |
| 59 | 60 |
| 60 private: | 61 private: |
| 61 enum InternalState { | 62 enum InternalState { |
| 62 IS_UNINITIALIZED, | 63 IS_UNINITIALIZED, |
| 63 IS_OPENING, | 64 IS_OPENING, |
| 64 IS_OPEN, | 65 IS_OPEN, |
| 65 IS_CLOSED, | 66 IS_CLOSED, |
| 66 IS_ERROR, | 67 IS_ERROR, |
| 67 }; | 68 }; |
| 68 | 69 |
| 70 // Reset send throttling mechanism to initial (i.e., no packets in-flight) | |
| 71 // state. | |
| 72 void ResetSendThrottling(); | |
| 73 | |
| 74 // Update trace of send throttling internal state. This should be called | |
| 75 // immediately after any changes to |send_bytes_available_| and/or | |
| 76 // |in_flight_packet_sizes_|. | |
| 77 void TraceSendThrottlingState() const; | |
| 78 | |
| 69 void InitAcceptedTcp(P2PSocketClient* client, | 79 void InitAcceptedTcp(P2PSocketClient* client, |
| 70 const talk_base::SocketAddress& local_address, | 80 const talk_base::SocketAddress& local_address, |
| 71 const talk_base::SocketAddress& remote_address); | 81 const talk_base::SocketAddress& remote_address); |
| 72 | 82 |
| 73 P2PSocketType type_; | 83 P2PSocketType type_; |
| 74 | 84 |
| 75 // Message loop on which this socket was created and being used. | 85 // Message loop on which this socket was created and being used. |
| 76 base::MessageLoop* message_loop_; | 86 base::MessageLoop* message_loop_; |
| 77 | 87 |
| 78 // Corresponding P2P socket client. | 88 // Corresponding P2P socket client. |
| 79 scoped_refptr<P2PSocketClient> client_; | 89 scoped_refptr<P2PSocketClient> client_; |
| 80 | 90 |
| 81 // Local address is allocated by the browser process, and the | 91 // Local address is allocated by the browser process, and the |
| 82 // renderer side doesn't know the address until it receives OnOpen() | 92 // renderer side doesn't know the address until it receives OnOpen() |
| 83 // event from the browser. | 93 // event from the browser. |
| 84 talk_base::SocketAddress local_address_; | 94 talk_base::SocketAddress local_address_; |
| 85 | 95 |
| 86 // Remote address for client TCP connections. | 96 // Remote address for client TCP connections. |
| 87 talk_base::SocketAddress remote_address_; | 97 talk_base::SocketAddress remote_address_; |
| 88 | 98 |
| 89 // Current state of the object. | 99 // Current state of the object. |
| 90 InternalState state_; | 100 InternalState state_; |
| 91 | 101 |
| 92 // Number which have been sent to the browser, but for which we haven't | 102 // Track the number of bytes allowed to be sent non-blocking. This is used to |
| 93 // received response. | 103 // throttle the sending of packets to the browser process. For each packet |
| 94 int send_packets_pending_; | 104 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs |
| 105 // from the browser process) are made, the value is increased back. This | |
| 106 // allows short bursts of high-rate sending without dropping packets, but | |
| 107 // quickly restricts the client to a sustainable steady-state rate. | |
| 108 size_t send_bytes_available_; | |
| 109 std::deque<size_t> in_flight_packet_sizes_; | |
| 95 | 110 |
| 96 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the | 111 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the |
| 97 // caller expects SignalWritable notification. | 112 // caller expects SignalWritable notification. |
| 98 bool writable_signal_expected_; | 113 bool writable_signal_expected_; |
| 99 | 114 |
| 100 // Current error code. Valid when state_ == IS_ERROR. | 115 // Current error code. Valid when state_ == IS_ERROR. |
| 101 int error_; | 116 int error_; |
| 102 | 117 |
| 103 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); | 118 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); |
| 104 }; | 119 }; |
| 105 | 120 |
| 106 IpcPacketSocket::IpcPacketSocket() | 121 IpcPacketSocket::IpcPacketSocket() |
| 107 : type_(P2P_SOCKET_UDP), | 122 : type_(P2P_SOCKET_UDP), |
| 108 message_loop_(base::MessageLoop::current()), | 123 message_loop_(base::MessageLoop::current()), |
| 109 state_(IS_UNINITIALIZED), | 124 state_(IS_UNINITIALIZED), |
| 110 send_packets_pending_(0), | 125 error_(0) { |
| 111 writable_signal_expected_(false), | 126 } |
| 112 error_(0) {} | |
| 113 | 127 |
| 114 IpcPacketSocket::~IpcPacketSocket() { | 128 IpcPacketSocket::~IpcPacketSocket() { |
| 115 if (state_ == IS_OPENING || state_ == IS_OPEN || | 129 if (state_ == IS_OPENING || state_ == IS_OPEN || |
| 116 state_ == IS_ERROR) { | 130 state_ == IS_ERROR) { |
| 117 Close(); | 131 Close(); |
| 118 } | 132 } |
| 119 } | 133 } |
| 120 | 134 |
| 135 void IpcPacketSocket::ResetSendThrottling() { | |
|
Sergey Ulanov
2013/05/07 00:14:29
Now this is called only when the socket is being i
miu
2013/05/07 00:49:40
Done.
| |
| 136 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate); | |
| 137 send_bytes_available_ = kMaximumInFlightBytes; | |
| 138 in_flight_packet_sizes_.clear(); | |
| 139 TraceSendThrottlingState(); | |
| 140 writable_signal_expected_ = false; | |
| 141 } | |
| 142 | |
| 143 void IpcPacketSocket::TraceSendThrottlingState() const { | |
| 144 TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", send_bytes_available_); | |
| 145 TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight", | |
| 146 in_flight_packet_sizes_.size()); | |
| 147 } | |
| 148 | |
| 121 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, | 149 bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, |
| 122 const talk_base::SocketAddress& local_address, | 150 const talk_base::SocketAddress& local_address, |
| 123 const talk_base::SocketAddress& remote_address) { | 151 const talk_base::SocketAddress& remote_address) { |
| 124 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 152 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 125 DCHECK_EQ(state_, IS_UNINITIALIZED); | 153 DCHECK_EQ(state_, IS_UNINITIALIZED); |
| 126 | 154 |
| 127 type_ = type; | 155 type_ = type; |
| 128 client_ = client; | 156 client_ = client; |
| 129 local_address_ = local_address; | 157 local_address_ = local_address; |
| 130 remote_address_ = remote_address; | 158 remote_address_ = remote_address; |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 150 P2PSocketClient* client, | 178 P2PSocketClient* client, |
| 151 const talk_base::SocketAddress& local_address, | 179 const talk_base::SocketAddress& local_address, |
| 152 const talk_base::SocketAddress& remote_address) { | 180 const talk_base::SocketAddress& remote_address) { |
| 153 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 181 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 154 DCHECK_EQ(state_, IS_UNINITIALIZED); | 182 DCHECK_EQ(state_, IS_UNINITIALIZED); |
| 155 | 183 |
| 156 client_ = client; | 184 client_ = client; |
| 157 local_address_ = local_address; | 185 local_address_ = local_address; |
| 158 remote_address_ = remote_address; | 186 remote_address_ = remote_address; |
| 159 state_ = IS_OPEN; | 187 state_ = IS_OPEN; |
| 188 ResetSendThrottling(); | |
| 160 client_->set_delegate(this); | 189 client_->set_delegate(this); |
| 161 } | 190 } |
| 162 | 191 |
| 163 // talk_base::AsyncPacketSocket interface. | 192 // talk_base::AsyncPacketSocket interface. |
| 164 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { | 193 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { |
| 165 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 194 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 166 return local_address_; | 195 return local_address_; |
| 167 } | 196 } |
| 168 | 197 |
| 169 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { | 198 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 188 return EWOULDBLOCK; | 217 return EWOULDBLOCK; |
| 189 case IS_CLOSED: | 218 case IS_CLOSED: |
| 190 return ENOTCONN; | 219 return ENOTCONN; |
| 191 case IS_ERROR: | 220 case IS_ERROR: |
| 192 return error_; | 221 return error_; |
| 193 case IS_OPEN: | 222 case IS_OPEN: |
| 194 // Continue sending the packet. | 223 // Continue sending the packet. |
| 195 break; | 224 break; |
| 196 } | 225 } |
| 197 | 226 |
| 198 if (send_packets_pending_ > kMaxPendingPackets) { | 227 if (data_size == 0) { |
| 199 TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", | 228 NOTREACHED(); |
| 229 return 0; | |
| 230 } | |
| 231 | |
| 232 if (data_size > send_bytes_available_) { | |
| 233 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", | |
| 200 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); | 234 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); |
| 201 writable_signal_expected_ = true; | 235 writable_signal_expected_ = true; |
| 202 error_ = EWOULDBLOCK; | 236 error_ = EWOULDBLOCK; |
| 203 return -1; | 237 return -1; |
| 204 } | 238 } |
| 205 | 239 |
| 206 const char* data_char = reinterpret_cast<const char*>(data); | |
| 207 std::vector<char> data_vector(data_char, data_char + data_size); | |
| 208 | |
| 209 net::IPEndPoint address_chrome; | 240 net::IPEndPoint address_chrome; |
| 210 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { | 241 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { |
| 211 NOTREACHED(); | 242 NOTREACHED(); |
| 212 return -1; | 243 return -1; |
| 213 } | 244 } |
| 214 | 245 |
| 215 ++send_packets_pending_; | 246 send_bytes_available_ -= data_size; |
| 247 in_flight_packet_sizes_.push_back(data_size); | |
| 248 TraceSendThrottlingState(); | |
| 249 | |
| 250 const char* data_char = reinterpret_cast<const char*>(data); | |
| 251 std::vector<char> data_vector(data_char, data_char + data_size); | |
| 216 client_->Send(address_chrome, data_vector); | 252 client_->Send(address_chrome, data_vector); |
| 217 | 253 |
| 218 // Fake successful send. The caller ignores result anyway. | 254 // Fake successful send. The caller ignores result anyway. |
| 219 return data_size; | 255 return data_size; |
| 220 } | 256 } |
| 221 | 257 |
| 222 int IpcPacketSocket::Close() { | 258 int IpcPacketSocket::Close() { |
| 223 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 259 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 224 | 260 |
| 225 client_->Close(); | 261 client_->Close(); |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 279 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 315 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 280 | 316 |
| 281 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { | 317 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { |
| 282 // Always expect correct IPv4 address to be allocated. | 318 // Always expect correct IPv4 address to be allocated. |
| 283 NOTREACHED(); | 319 NOTREACHED(); |
| 284 OnError(); | 320 OnError(); |
| 285 return; | 321 return; |
| 286 } | 322 } |
| 287 | 323 |
| 288 state_ = IS_OPEN; | 324 state_ = IS_OPEN; |
| 325 ResetSendThrottling(); | |
| 289 | 326 |
| 290 SignalAddressReady(this, local_address_); | 327 SignalAddressReady(this, local_address_); |
| 291 if (type_ == P2P_SOCKET_TCP_CLIENT) | 328 if (type_ == P2P_SOCKET_TCP_CLIENT) |
| 292 SignalConnect(this); | 329 SignalConnect(this); |
| 293 } | 330 } |
| 294 | 331 |
| 295 void IpcPacketSocket::OnIncomingTcpConnection( | 332 void IpcPacketSocket::OnIncomingTcpConnection( |
| 296 const net::IPEndPoint& address, | 333 const net::IPEndPoint& address, |
| 297 P2PSocketClient* client) { | 334 P2PSocketClient* client) { |
| 298 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 335 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 299 | 336 |
| 300 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); | 337 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); |
| 301 | 338 |
| 302 talk_base::SocketAddress remote_address; | 339 talk_base::SocketAddress remote_address; |
| 303 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { | 340 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { |
| 304 // Always expect correct IPv4 address to be allocated. | 341 // Always expect correct IPv4 address to be allocated. |
| 305 NOTREACHED(); | 342 NOTREACHED(); |
| 306 } | 343 } |
| 307 socket->InitAcceptedTcp(client, local_address_, remote_address); | 344 socket->InitAcceptedTcp(client, local_address_, remote_address); |
| 308 SignalNewConnection(this, socket.release()); | 345 SignalNewConnection(this, socket.release()); |
| 309 } | 346 } |
| 310 | 347 |
| 311 void IpcPacketSocket::OnSendComplete() { | 348 void IpcPacketSocket::OnSendComplete() { |
| 312 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 349 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 313 | 350 |
| 314 --send_packets_pending_; | 351 CHECK(!in_flight_packet_sizes_.empty()) |
| 315 DCHECK_GE(send_packets_pending_, 0); | 352 << "Received SendComplete() with no known in-flight packets."; |
|
Sergey Ulanov
2013/05/07 00:14:29
nit: remove the message. It makes release binaries
miu
2013/05/07 00:49:40
Done.
| |
| 353 send_bytes_available_ += in_flight_packet_sizes_.front(); | |
| 354 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes); | |
| 355 in_flight_packet_sizes_.pop_front(); | |
| 356 TraceSendThrottlingState(); | |
| 316 | 357 |
| 317 if (writable_signal_expected_ && | 358 if (writable_signal_expected_ && send_bytes_available_ > 0) { |
| 318 send_packets_pending_ <= kWritableSignalThreshold) { | |
| 319 SignalReadyToSend(this); | 359 SignalReadyToSend(this); |
| 320 writable_signal_expected_ = false; | 360 writable_signal_expected_ = false; |
| 321 } | 361 } |
| 322 } | 362 } |
| 323 | 363 |
| 324 void IpcPacketSocket::OnError() { | 364 void IpcPacketSocket::OnError() { |
| 325 DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 365 DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| 326 state_ = IS_ERROR; | 366 state_ = IS_ERROR; |
| 327 error_ = ECONNABORTED; | 367 error_ = ECONNABORTED; |
| 328 } | 368 } |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 395 talk_base::SocketAddress crome_address; | 435 talk_base::SocketAddress crome_address; |
| 396 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); | 436 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); |
| 397 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); | 437 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); |
| 398 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, | 438 if (!socket->Init(P2P_SOCKET_TCP_CLIENT, socket_client, local_address, |
| 399 remote_address)) | 439 remote_address)) |
| 400 return NULL; | 440 return NULL; |
| 401 return socket.release(); | 441 return socket.release(); |
| 402 } | 442 } |
| 403 | 443 |
| 404 } // namespace content | 444 } // namespace content |
| OLD | NEW |