 Chromium Code Reviews
 Chromium Code Reviews Issue 14559005:
  Replace send packet throttling with a scheme based on number of in-flight bytes in IpcPacketSocket.  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src
    
  
    Issue 14559005:
  Replace send packet throttling with a scheme based on number of in-flight bytes in IpcPacketSocket.  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src| Index: content/renderer/p2p/ipc_socket_factory.cc | 
| diff --git a/content/renderer/p2p/ipc_socket_factory.cc b/content/renderer/p2p/ipc_socket_factory.cc | 
| index d1f56863645ac771186fdde0160eb82eb218d5a7..a7903e97314777eeb4b9026d350630fafcc09c3a 100644 | 
| --- a/content/renderer/p2p/ipc_socket_factory.cc | 
| +++ b/content/renderer/p2p/ipc_socket_factory.cc | 
| @@ -4,6 +4,8 @@ | 
| #include "content/renderer/p2p/ipc_socket_factory.h" | 
| +#include <deque> | 
| + | 
| #include "base/compiler_specific.h" | 
| #include "base/debug/trace_event.h" | 
| #include "base/message_loop.h" | 
| @@ -17,9 +19,8 @@ namespace content { | 
| namespace { | 
| -// TODO(hclam): This shouldn't be a pre-defined value. Bug: crbug.com/181321. | 
| -const int kMaxPendingPackets = 32; | 
| -const int kWritableSignalThreshold = 0; | 
| +// TODO(miu): This probably needs tuning. http://crbug.com/237960 | 
| +const size_t kMaximumInFlightBytes = 256 << 10; // 256 KB | 
| 
Sergey Ulanov
2013/05/06 22:18:02
nit: *1024 is more readable than <<10.
 
miu
2013/05/06 23:36:13
Done.
 | 
| // IpcPacketSocket implements talk_base::AsyncPacketSocket interface | 
| // using P2PSocketClient that works over IPC-channel. It must be used | 
| @@ -66,6 +67,15 @@ class IpcPacketSocket : public talk_base::AsyncPacketSocket, | 
| IS_ERROR, | 
| }; | 
| + // Reset send throttling mechanism to initial (i.e., no packets in-flight) | 
| + // state. | 
| + void ResetSendThrottling(); | 
| + | 
| + // Update trace of send throttling internal state. This should be called | 
| + // immediately after any changes to |token_bucket_level_| and/or | 
| + // |in_flight_packet_sizes_|. | 
| + void TraceSendThrottlingState() const; | 
| + | 
| void InitAcceptedTcp(P2PSocketClient* client, | 
| const talk_base::SocketAddress& local_address, | 
| const talk_base::SocketAddress& remote_address); | 
| @@ -89,9 +99,13 @@ class IpcPacketSocket : public talk_base::AsyncPacketSocket, | 
| // Current state of the object. | 
| InternalState state_; | 
| - // Number which have been sent to the browser, but for which we haven't | 
| - // received response. | 
| - int send_packets_pending_; | 
| + // A token bucket of bytes is used to throttle the sending of packets to the | 
| 
Sergey Ulanov
2013/05/06 22:18:02
It doesn't look right to me that this is called "T
 
miu
2013/05/06 23:36:13
Ah, yes.  This did start out as a token bucket, wh
 | 
| + // browser process. As calls to OnSendComplete() return, the bucket is | 
| + // refilled. This allows short bursts of high-rate sending without dropping | 
| + // packets, but quickly restricts the client to a sustainable steady-state | 
| + // rate. | 
| + size_t token_bucket_level_; | 
| 
Sergey Ulanov
2013/05/06 22:18:02
maybe int instead of size_t, because it's not size
 
miu
2013/05/06 23:36:13
But it *is* a size: It's always equal to kMaximumI
 | 
| + std::deque<size_t> in_flight_packet_sizes_; | 
| // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the | 
| // caller expects SignalWritable notification. | 
| @@ -107,9 +121,8 @@ IpcPacketSocket::IpcPacketSocket() | 
| : type_(P2P_SOCKET_UDP), | 
| message_loop_(base::MessageLoop::current()), | 
| state_(IS_UNINITIALIZED), | 
| - send_packets_pending_(0), | 
| - writable_signal_expected_(false), | 
| - error_(0) {} | 
| + error_(0) { | 
| +} | 
| IpcPacketSocket::~IpcPacketSocket() { | 
| if (state_ == IS_OPENING || state_ == IS_OPEN || | 
| @@ -118,6 +131,20 @@ IpcPacketSocket::~IpcPacketSocket() { | 
| } | 
| } | 
| +void IpcPacketSocket::ResetSendThrottling() { | 
| + COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate); | 
| + token_bucket_level_ = kMaximumInFlightBytes; | 
| + in_flight_packet_sizes_.clear(); | 
| + TraceSendThrottlingState(); | 
| + writable_signal_expected_ = false; | 
| +} | 
| + | 
| +void IpcPacketSocket::TraceSendThrottlingState() const { | 
| + TRACE_COUNTER1("p2p", "P2PSendBytesAvailable", token_bucket_level_); | 
| + TRACE_COUNTER1("p2p", "P2PSendPacketsInFlight", | 
| + in_flight_packet_sizes_.size()); | 
| +} | 
| + | 
| bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, | 
| const talk_base::SocketAddress& local_address, | 
| const talk_base::SocketAddress& remote_address) { | 
| @@ -157,6 +184,7 @@ void IpcPacketSocket::InitAcceptedTcp( | 
| local_address_ = local_address; | 
| remote_address_ = remote_address; | 
| state_ = IS_OPEN; | 
| + ResetSendThrottling(); | 
| client_->set_delegate(this); | 
| } | 
| @@ -195,24 +223,29 @@ int IpcPacketSocket::SendTo(const void *data, size_t data_size, | 
| break; | 
| } | 
| - if (send_packets_pending_ > kMaxPendingPackets) { | 
| - TRACE_EVENT_INSTANT1("p2p", "MaxPendingPacketsWouldBlock", | 
| + if (data_size == 0) | 
| 
Sergey Ulanov
2013/05/06 22:18:02
I think this can be DCHECK_GT(data_size, 0).
 
miu
2013/05/06 23:36:13
After thinking this over, I believe a NOTREACHED()
 | 
| + return 0; // No-op. | 
| + | 
| + if (data_size > token_bucket_level_) { | 
| + TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", | 
| TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); | 
| writable_signal_expected_ = true; | 
| error_ = EWOULDBLOCK; | 
| return -1; | 
| } | 
| - const char* data_char = reinterpret_cast<const char*>(data); | 
| - std::vector<char> data_vector(data_char, data_char + data_size); | 
| - | 
| net::IPEndPoint address_chrome; | 
| if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { | 
| NOTREACHED(); | 
| return -1; | 
| } | 
| - ++send_packets_pending_; | 
| + token_bucket_level_ -= data_size; | 
| + in_flight_packet_sizes_.push_back(data_size); | 
| + TraceSendThrottlingState(); | 
| + | 
| + const char* data_char = reinterpret_cast<const char*>(data); | 
| + std::vector<char> data_vector(data_char, data_char + data_size); | 
| client_->Send(address_chrome, data_vector); | 
| // Fake successful send. The caller ignores result anyway. | 
| @@ -286,6 +319,7 @@ void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) { | 
| } | 
| state_ = IS_OPEN; | 
| + ResetSendThrottling(); | 
| SignalAddressReady(this, local_address_); | 
| if (type_ == P2P_SOCKET_TCP_CLIENT) | 
| @@ -311,11 +345,21 @@ void IpcPacketSocket::OnIncomingTcpConnection( | 
| void IpcPacketSocket::OnSendComplete() { | 
| DCHECK_EQ(base::MessageLoop::current(), message_loop_); | 
| - --send_packets_pending_; | 
| - DCHECK_GE(send_packets_pending_, 0); | 
| + if (in_flight_packet_sizes_.empty()) { | 
| + NOTREACHED() << "Received SendComplete() with no known in-flight packets."; | 
| + // In Release builds, auto-recover by resetting the throttling state. | 
| + const bool signal_expected = writable_signal_expected_; | 
| + ResetSendThrottling(); | 
| 
Sergey Ulanov
2013/05/06 22:18:02
Why do you need this given it's marked as NOTREACH
 
miu
2013/05/06 23:36:13
Done.  It's a CHECK() now.
 | 
| + if (signal_expected) | 
| + SignalReadyToSend(this); | 
| + return; | 
| + } | 
| + token_bucket_level_ += in_flight_packet_sizes_.front(); | 
| + DCHECK_LE(token_bucket_level_, kMaximumInFlightBytes); | 
| + in_flight_packet_sizes_.pop_front(); | 
| + TraceSendThrottlingState(); | 
| - if (writable_signal_expected_ && | 
| - send_packets_pending_ <= kWritableSignalThreshold) { | 
| + if (writable_signal_expected_ && token_bucket_level_ > 0) { | 
| SignalReadyToSend(this); | 
| writable_signal_expected_ = false; | 
| } |