Chromium Code Reviews| Index: content/renderer/p2p/ipc_socket_factory.cc |
| diff --git a/content/renderer/p2p/ipc_socket_factory.cc b/content/renderer/p2p/ipc_socket_factory.cc |
| index d1f56863645ac771186fdde0160eb82eb218d5a7..a7903e97314777eeb4b9026d350630fafcc09c3a 100644 |
| --- a/content/renderer/p2p/ipc_socket_factory.cc |
| +++ b/content/renderer/p2p/ipc_socket_factory.cc |
| @@ -4,6 +4,8 @@ |
| #include "content/renderer/p2p/ipc_socket_factory.h" |
| +#include <deque> |
| + |
| #include "base/compiler_specific.h" |
| #include "base/debug/trace_event.h" |
| #include "base/message_loop.h" |
| @@ -17,9 +19,8 @@ namespace content { |
| namespace { |
| -// TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. |
| -const int kMaxPendingPackets = 32; |
| -const int kWritableSignalThreshold = 0; |
| +// TODO(miu): This probably needs tuning. http://crbug.com/237960 |
| +const size_t kMaximumInFlightBytes = 256 << 10; // 256 KB |
|
Sergey Ulanov
2013/05/06 22:18:02
nit: *1024 is more readable than <<10.
miu
2013/05/06 23:36:13
Done.
|
| // IpcPacketSocket implements talk_base::AsyncPacketSocket interface |
| // using P2PSocketClient that works over IPC-channel. It must be used |
| @@ -66,6 +67,15 @@ class IpcPacketSocket : public talk_base::AsyncPacketSocket, |
| IS_ERROR, |
| }; |
| + // Reset send throttling mechanism to initial (i.e., no packets in-flight) |
| + // state. |
| + void ResetSendThrottling(); |
| + |
| + // Update trace of send throttling internal state. This should be called |
| + // immediately after any changes to |token_bucket_level_| and/or |
| + // |in_flight_packet_sizes_|. |
| + void TraceSendThrottlingState() const; |
| + |
| void InitAcceptedTcp(P2PSocketClient* client, |
| const talk_base::SocketAddress& local_address, |
| const talk_base::SocketAddress& remote_address); |
| @@ -89,9 +99,13 @@ class IpcPacketSocket : public talk_base::AsyncPacketSocket, |
| // Current state of the object. |
| InternalState state_; |
| - // Number which have been sent to the browser, but for which we haven't |
| - // received response. |
| - int send_packets_pending_; |
| + // A token bucket of bytes is used to throttle the sending of packets to the |
|
Sergey Ulanov
2013/05/06 22:18:02
It doesn't look right to me that this is called "T
miu
2013/05/06 23:36:13
Ah, yes. This did start out as a token bucket, wh
|
| + // browser process. As calls to OnSendComplete() return, the bucket is |
| + // refilled. This allows short bursts of high-rate sending without dropping |
| + // packets, but quickly restricts the client to a sustainable steady-state |
| + // rate. |
| + size_t token_bucket_level_; |
|
Sergey Ulanov
2013/05/06 22:18:02
maybe int instead of size_t, because it's not size
miu
2013/05/06 23:36:13
But it *is* a size: It's always equal to kMaximumI
|
| + std::deque<size_t> in_flight_packet_sizes_; |
| // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the |
| // caller expects SignalWritable notification. |
| @@ -107,9 +121,8 @@ IpcPacketSocket::IpcPacketSocket() |
| : type_(P2P_SOCKET_UDP), |
| message_loop_(base::MessageLoop::current()), |
| state_(IS_UNINITIALIZED), |
| - send_packets_pending_(0), |
| - writable_signal_expected_(false), |
| - error_(0) {} |
| + error_(0) { |
| +} |
| IpcPacketSocket::~IpcPacketSocket() { |
| if (state_ == IS_OPENING || state_ == IS_OPEN || |
| @@ -118,6 +131,20 @@ IpcPacketSocket::~IpcPacketSocket() { |
| } |
| } |
| +void IpcPacketSocket::ResetSendThrottling() { |
| + COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate); |
| + token_bucket_level_ = kMaximumInFlightBytes; |
| + in_flight_packet_sizes_.clear(); |
| + TraceSendThrottlingState(); |
| + writable_signal_expected_ = false; |
| +} |
| + |
| +void IpcPacketSocket::TraceSendThrottlingState() const { |
| + TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", token_bucket_level_); |
| + TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight", |
| + in_flight_packet_sizes_.size()); |
| +} |
| + |
| bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, |
| const talk_base::SocketAddress& local_address, |
| const talk_base::SocketAddress& remote_address) { |
| @@ -157,6 +184,7 @@ void IpcPacketSocket::InitAcceptedTcp( |
| local_address_ = local_address; |
| remote_address_ = remote_address; |
| state_ = IS_OPEN; |
| + ResetSendThrottling(); |
| client_->set_delegate(this); |
| } |
| @@ -195,24 +223,29 @@ int IpcPacketSocket::SendTo(const void *data, size_t data_size, |
| break; |
| } |
| - if (send_packets_pending_ > kMaxPendingPackets) { |
| - TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", |
| + if (data_size == 0) |
|
Sergey Ulanov
2013/05/06 22:18:02
I think this can be DCHECK_GT(data_size, 0).
miu
2013/05/06 23:36:13
After thinking this over, I believe a NOTREACHED()
|
| + return 0; // No-op. |
| + |
| + if (data_size > token_bucket_level_) { |
| + TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", |
| TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); |
| writable_signal_expected_ = true; |
| error_ = EWOULDBLOCK; |
| return -1; |
| } |
| - const char* data_char = reinterpret_cast<const char*>(data); |
| - std::vector<char> data_vector(data_char, data_char + data_size); |
| - |
| net::IPEndPoint address_chrome; |
| if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { |
| NOTREACHED(); |
| return -1; |
| } |
| - ++send_packets_pending_; |
| + token_bucket_level_ -= data_size; |
| + in_flight_packet_sizes_.push_back(data_size); |
| + TraceSendThrottlingState(); |
| + |
| + const char* data_char = reinterpret_cast<const char*>(data); |
| + std::vector<char> data_vector(data_char, data_char + data_size); |
| client_->Send(address_chrome, data_vector); |
| // Fake successful send. The caller ignores result anyway. |
| @@ -286,6 +319,7 @@ void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) { |
| } |
| state_ = IS_OPEN; |
| + ResetSendThrottling(); |
| SignalAddressReady(this, local_address_); |
| if (type_ == P2P_SOCKET_TCP_CLIENT) |
| @@ -311,11 +345,21 @@ void IpcPacketSocket::OnIncomingTcpConnection( |
| void IpcPacketSocket::OnSendComplete() { |
| DCHECK_EQ(base::MessageLoop::current(), message_loop_); |
| - --send_packets_pending_; |
| - DCHECK_GE(send_packets_pending_, 0); |
| + if (in_flight_packet_sizes_.empty()) { |
| + NOTREACHED() << "Received SendComplete() with no known in-flight packets."; |
| + // In Release builds, auto-recover by resetting the throttling state. |
| + const bool signal_expected = writable_signal_expected_; |
| + ResetSendThrottling(); |
|
Sergey Ulanov
2013/05/06 22:18:02
Why do you need this given it's marked as NOTREACH
miu
2013/05/06 23:36:13
Done. It's a CHECK() now.
|
| + if (signal_expected) |
| + SignalReadyToSend(this); |
| + return; |
| + } |
| + token_bucket_level_ += in_flight_packet_sizes_.front(); |
| + DCHECK_LE(token_bucket_level_, kMaximumInFlightBytes); |
| + in_flight_packet_sizes_.pop_front(); |
| + TraceSendThrottlingState(); |
| - if (writable_signal_expected_ && |
| - send_packets_pending_ <= kWritableSignalThreshold) { |
| + if (writable_signal_expected_ && token_bucket_level_ > 0) { |
| SignalReadyToSend(this); |
| writable_signal_expected_ = false; |
| } |