Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: content/renderer/p2p/ipc_socket_factory.cc

Issue 759923003: Detect situation when there is no missing send completion signal in P2PSocket implementation. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/renderer/p2p/ipc_socket_factory.h" 5 #include "content/renderer/p2p/ipc_socket_factory.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <deque> 8 #include <list>
9 9
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
11 #include "base/debug/trace_event.h" 11 #include "base/debug/trace_event.h"
12 #include "base/message_loop/message_loop.h" 12 #include "base/message_loop/message_loop.h"
13 #include "base/message_loop/message_loop_proxy.h" 13 #include "base/message_loop/message_loop_proxy.h"
14 #include "base/metrics/field_trial.h" 14 #include "base/metrics/field_trial.h"
15 #include "base/metrics/histogram.h" 15 #include "base/metrics/histogram.h"
16 #include "base/strings/string_number_conversions.h" 16 #include "base/strings/string_number_conversions.h"
17 #include "base/strings/stringprintf.h" 17 #include "base/strings/stringprintf.h"
18 #include "base/threading/non_thread_safe.h" 18 #include "base/threading/non_thread_safe.h"
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
69 69
70 // IpcPacketSocket implements rtc::AsyncPacketSocket interface 70 // IpcPacketSocket implements rtc::AsyncPacketSocket interface
71 // using P2PSocketClient that works over IPC-channel. It must be used 71 // using P2PSocketClient that works over IPC-channel. It must be used
72 // on the thread it was created. 72 // on the thread it was created.
73 class IpcPacketSocket : public rtc::AsyncPacketSocket, 73 class IpcPacketSocket : public rtc::AsyncPacketSocket,
74 public P2PSocketClientDelegate { 74 public P2PSocketClientDelegate {
75 public: 75 public:
76 IpcPacketSocket(); 76 IpcPacketSocket();
77 ~IpcPacketSocket() override; 77 ~IpcPacketSocket() override;
78 78
79 // Struct to track information when a packet is received by this socket for
80 // send. The information tracked here will be used to match with the
81 // P2PSendPacketMetrics from the underneath system socket.
82 struct InFlightPacketRecord {
83 InFlightPacketRecord(uint64 packet_id,
84 size_t packet_size,
85 uint64 timeticks_received_by_socket)
86 : packet_id(packet_id),
87 packet_size(packet_size),
88 timeticks_received_by_socket(timeticks_received_by_socket) {}
89
90 uint64 packet_id;
91 size_t packet_size;
92 // The first time this packet shows up to this socket.
93 uint64 timeticks_received_by_socket;
94 };
95
96 typedef std::list<InFlightPacketRecord> InFlightPacketList;
97
79 // Always takes ownership of client even if initialization fails. 98 // Always takes ownership of client even if initialization fails.
80 bool Init(P2PSocketType type, P2PSocketClientImpl* client, 99 bool Init(P2PSocketType type, P2PSocketClientImpl* client,
81 const rtc::SocketAddress& local_address, 100 const rtc::SocketAddress& local_address,
82 const rtc::SocketAddress& remote_address); 101 const rtc::SocketAddress& remote_address);
83 102
84 // rtc::AsyncPacketSocket interface. 103 // rtc::AsyncPacketSocket interface.
85 rtc::SocketAddress GetLocalAddress() const override; 104 rtc::SocketAddress GetLocalAddress() const override;
86 rtc::SocketAddress GetRemoteAddress() const override; 105 rtc::SocketAddress GetRemoteAddress() const override;
87 int Send(const void* pv, 106 int Send(const void* pv,
88 size_t cb, 107 size_t cb,
89 const rtc::PacketOptions& options) override; 108 const rtc::PacketOptions& options) override;
90 int SendTo(const void* pv, 109 int SendTo(const void* pv,
91 size_t cb, 110 size_t cb,
92 const rtc::SocketAddress& addr, 111 const rtc::SocketAddress& addr,
93 const rtc::PacketOptions& options) override; 112 const rtc::PacketOptions& options) override;
94 int Close() override; 113 int Close() override;
95 State GetState() const override; 114 State GetState() const override;
96 int GetOption(rtc::Socket::Option option, int* value) override; 115 int GetOption(rtc::Socket::Option option, int* value) override;
97 int SetOption(rtc::Socket::Option option, int value) override; 116 int SetOption(rtc::Socket::Option option, int value) override;
98 int GetError() const override; 117 int GetError() const override;
99 void SetError(int error) override; 118 void SetError(int error) override;
100 119
101 // P2PSocketClientDelegate implementation. 120 // P2PSocketClientDelegate implementation.
102 void OnOpen(const net::IPEndPoint& local_address, 121 void OnOpen(const net::IPEndPoint& local_address,
103 const net::IPEndPoint& remote_address) override; 122 const net::IPEndPoint& remote_address) override;
104 void OnIncomingTcpConnection(const net::IPEndPoint& address, 123 void OnIncomingTcpConnection(const net::IPEndPoint& address,
105 P2PSocketClient* client) override; 124 P2PSocketClient* client) override;
106 void OnSendComplete() override; 125 void OnSendComplete(const P2PSendPacketMetrics& send_metrics) override;
126
107 void OnError() override; 127 void OnError() override;
108 void OnDataReceived(const net::IPEndPoint& address, 128 void OnDataReceived(const net::IPEndPoint& address,
109 const std::vector<char>& data, 129 const std::vector<char>& data,
110 const base::TimeTicks& timestamp) override; 130 const base::TimeTicks& timestamp) override;
111 131
112 private: 132 private:
113 enum InternalState { 133 enum InternalState {
114 IS_UNINITIALIZED, 134 IS_UNINITIALIZED,
115 IS_OPENING, 135 IS_OPENING,
116 IS_OPEN, 136 IS_OPEN,
117 IS_CLOSED, 137 IS_CLOSED,
118 IS_ERROR, 138 IS_ERROR,
119 }; 139 };
120 140
121 // Increment the counter for consecutive bytes discarded as socket is running 141 // Increment the counter for consecutive bytes discarded as socket is running
122 // out of buffer. 142 // out of buffer.
123 void IncrementDiscardCounters(size_t bytes_discarded); 143 void IncrementDiscardCounters(size_t bytes_discarded);
124 144
125 // Update trace of send throttling internal state. This should be called 145 // Update trace of send throttling internal state. This should be called
126 // immediately after any changes to |send_bytes_available_| and/or 146 // immediately after any changes to |send_bytes_available_| and/or
127 // |in_flight_packet_sizes_|. 147 // |in_flight_packet_records_|.
128 void TraceSendThrottlingState() const; 148 void TraceSendThrottlingState() const;
129 149
130 void InitAcceptedTcp(P2PSocketClient* client, 150 void InitAcceptedTcp(P2PSocketClient* client,
131 const rtc::SocketAddress& local_address, 151 const rtc::SocketAddress& local_address,
132 const rtc::SocketAddress& remote_address); 152 const rtc::SocketAddress& remote_address);
133 153
134 int DoSetOption(P2PSocketOption option, int value); 154 int DoSetOption(P2PSocketOption option, int value);
135 155
136 // Allow a finch experiment to control the initial value of 156 // Allow a finch experiment to control the initial value of
137 // send_bytes_available_; 157 // send_bytes_available_;
138 void AdjustUdpSendBufferSize(); 158 void AdjustUdpSendBufferSize();
139 159
160 // Helper function to find the matching InFlightPacketRecord by packet_id.
161 // In normal case, it should always be the first one so the perf impact should
162 // be negligible. Return value is the iterator pointing to the matching
163 // InFlightPacketRecord. Can be InFlightPacketRecord::end() if no matching one
164 // is found.
165 InFlightPacketList::iterator FindInFlightRecord(uint64 packet_id);
166
140 P2PSocketType type_; 167 P2PSocketType type_;
141 168
142 // Message loop on which this socket was created and being used. 169 // Message loop on which this socket was created and being used.
143 base::MessageLoop* message_loop_; 170 base::MessageLoop* message_loop_;
144 171
145 // Corresponding P2P socket client. 172 // Corresponding P2P socket client.
146 scoped_refptr<P2PSocketClient> client_; 173 scoped_refptr<P2PSocketClient> client_;
147 174
148 // Local address is allocated by the browser process, and the 175 // Local address is allocated by the browser process, and the
149 // renderer side doesn't know the address until it receives OnOpen() 176 // renderer side doesn't know the address until it receives OnOpen()
150 // event from the browser. 177 // event from the browser.
151 rtc::SocketAddress local_address_; 178 rtc::SocketAddress local_address_;
152 179
153 // Remote address for client TCP connections. 180 // Remote address for client TCP connections.
154 rtc::SocketAddress remote_address_; 181 rtc::SocketAddress remote_address_;
155 182
156 // Current state of the object. 183 // Current state of the object.
157 InternalState state_; 184 InternalState state_;
158 185
159 // Track the number of bytes allowed to be sent non-blocking. This is used to 186 // Track the number of bytes allowed to be sent non-blocking. This is used to
160 // throttle the sending of packets to the browser process. For each packet 187 // throttle the sending of packets to the browser process. For each packet
161 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs 188 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
162 // from the browser process) are made, the value is increased back. This 189 // from the browser process) are made, the value is increased back. This
163 // allows short bursts of high-rate sending without dropping packets, but 190 // allows short bursts of high-rate sending without dropping packets, but
164 // quickly restricts the client to a sustainable steady-state rate. 191 // quickly restricts the client to a sustainable steady-state rate.
165 size_t send_bytes_available_; 192 size_t send_bytes_available_;
166 std::deque<size_t> in_flight_packet_sizes_; 193
194 // This is a sorted linked list of all InFlightPacketRecord by its packet_id.
195 // In normal case without reordering, the first packet should always be the
Sergey Ulanov 2014/12/16 00:21:43 I do not understand how reordering might happen. S
guoweis_left_chromium 2014/12/16 23:12:33 Yes, reordering probably shouldn't happen. I was c
196 // one that we're going to receive the next completion signal. However, since
197 // we can't rule out reordering happening (when system buffer is large enough
198 // on various platforms), if it happens, there could be some minor penalty
199 // looking for the right packet in the list. This list gives us a way to look
200 // for packets that we never receive its completion signal from OS.
201 InFlightPacketList in_flight_packet_records_;
167 202
168 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the 203 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
169 // caller expects SignalWritable notification. 204 // caller expects SignalWritable notification.
170 bool writable_signal_expected_; 205 bool writable_signal_expected_;
171 206
172 // Current error code. Valid when state_ == IS_ERROR. 207 // Current error code. Valid when state_ == IS_ERROR.
173 int error_; 208 int error_;
174 int options_[P2P_SOCKET_OPT_MAX]; 209 int options_[P2P_SOCKET_OPT_MAX];
175 210
176 // Track the maximum and current consecutive bytes discarded due to not enough 211 // Track the maximum and current consecutive bytes discarded due to not enough
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 if (total_packets_ > 0) { 273 if (total_packets_ > 0) {
239 UMA_HISTOGRAM_PERCENTAGE("WebRTC.ApplicationPercentPacketsDiscarded", 274 UMA_HISTOGRAM_PERCENTAGE("WebRTC.ApplicationPercentPacketsDiscarded",
240 (packets_discarded_ * 100) / total_packets_); 275 (packets_discarded_ * 100) / total_packets_);
241 } 276 }
242 } 277 }
243 278
244 void IpcPacketSocket::TraceSendThrottlingState() const { 279 void IpcPacketSocket::TraceSendThrottlingState() const {
245 TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(), 280 TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(),
246 send_bytes_available_); 281 send_bytes_available_);
247 TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(), 282 TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(),
248 in_flight_packet_sizes_.size()); 283 in_flight_packet_records_.size());
249 } 284 }
250 285
251 void IpcPacketSocket::IncrementDiscardCounters(size_t bytes_discarded) { 286 void IpcPacketSocket::IncrementDiscardCounters(size_t bytes_discarded) {
252 current_discard_bytes_sequence_ += bytes_discarded; 287 current_discard_bytes_sequence_ += bytes_discarded;
253 packets_discarded_++; 288 packets_discarded_++;
254 289
255 if (current_discard_bytes_sequence_ > max_discard_bytes_sequence_) { 290 if (current_discard_bytes_sequence_ > max_discard_bytes_sequence_) {
256 max_discard_bytes_sequence_ = current_discard_bytes_sequence_; 291 max_discard_bytes_sequence_ = current_discard_bytes_sequence_;
257 } 292 }
258 } 293 }
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
363 return EWOULDBLOCK; 398 return EWOULDBLOCK;
364 case IS_CLOSED: 399 case IS_CLOSED:
365 return ENOTCONN; 400 return ENOTCONN;
366 case IS_ERROR: 401 case IS_ERROR:
367 return error_; 402 return error_;
368 case IS_OPEN: 403 case IS_OPEN:
369 // Continue sending the packet. 404 // Continue sending the packet.
370 break; 405 break;
371 } 406 }
372 407
408 uint64 tick_received_in_socket = base::TimeTicks::Now().ToInternalValue();
409
373 if (data_size == 0) { 410 if (data_size == 0) {
374 NOTREACHED(); 411 NOTREACHED();
375 return 0; 412 return 0;
376 } 413 }
377 414
378 total_packets_++; 415 total_packets_++;
379 416
380 if (data_size > send_bytes_available_) { 417 if (data_size > send_bytes_available_) {
381 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", 418 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
382 TRACE_EVENT_SCOPE_THREAD, 419 TRACE_EVENT_SCOPE_THREAD,
383 "id", 420 "id",
384 client_->GetSocketID()); 421 client_->GetSocketID());
385 if (!writable_signal_expected_) { 422 if (!writable_signal_expected_) {
386 WebRtcLogMessage(base::StringPrintf( 423 WebRtcLogMessage(base::StringPrintf(
387 "IpcPacketSocket: sending is blocked. %d packets_in_flight.", 424 "IpcPacketSocket: sending is blocked. %d packets_in_flight.",
388 static_cast<int>(in_flight_packet_sizes_.size()))); 425 static_cast<int>(in_flight_packet_records_.size())));
389 426
390 writable_signal_expected_ = true; 427 writable_signal_expected_ = true;
391 } 428 }
392 429
393 error_ = EWOULDBLOCK; 430 error_ = EWOULDBLOCK;
394 IncrementDiscardCounters(data_size); 431 IncrementDiscardCounters(data_size);
395 return -1; 432 return -1;
396 } else { 433 } else {
397 current_discard_bytes_sequence_ = 0; 434 current_discard_bytes_sequence_ = 0;
398 } 435 }
399 436
400 net::IPEndPoint address_chrome; 437 net::IPEndPoint address_chrome;
401 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { 438 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
402 DVLOG(1) << "Failed to convert remote address to IPEndPoint: address = " 439 DVLOG(1) << "Failed to convert remote address to IPEndPoint: address = "
403 << address.ToSensitiveString() << ", remote_address_ = " 440 << address.ToSensitiveString() << ", remote_address_ = "
404 << remote_address_.ToSensitiveString(); 441 << remote_address_.ToSensitiveString();
405 NOTREACHED(); 442 NOTREACHED();
406 error_ = EINVAL; 443 error_ = EINVAL;
407 return -1; 444 return -1;
408 } 445 }
409 446
410 send_bytes_available_ -= data_size; 447 send_bytes_available_ -= data_size;
411 in_flight_packet_sizes_.push_back(data_size);
412 TraceSendThrottlingState();
413 448
414 const char* data_char = reinterpret_cast<const char*>(data); 449 const char* data_char = reinterpret_cast<const char*>(data);
415 std::vector<char> data_vector(data_char, data_char + data_size); 450 std::vector<char> data_vector(data_char, data_char + data_size);
416 client_->SendWithDscp(address_chrome, data_vector, options); 451 uint64 packet_id =
452 client_->SendWithDscp(address_chrome, data_vector, options);
453
454 // Ensure packet_id is not 0. It can't be the case according to
455 // P2PSocketClientImpl::SendWithDscp().
456 DCHECK_NE(packet_id, 0uL);
457
458 // Since OnSendComplete happens on the same thread, there is no chance that
459 // for the same packet, OnSendComplete is called before SendWithDscp is
460 // finished with |packet_id| back.
461 in_flight_packet_records_.push_back(
462 InFlightPacketRecord(packet_id, data_size, tick_received_in_socket));
463 TraceSendThrottlingState();
417 464
418 // Fake successful send. The caller ignores result anyway. 465 // Fake successful send. The caller ignores result anyway.
419 return data_size; 466 return data_size;
420 } 467 }
421 468
422 int IpcPacketSocket::Close() { 469 int IpcPacketSocket::Close() {
423 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 470 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
424 471
425 client_->Close(); 472 client_->Close();
426 state_ = IS_CLOSED; 473 state_ = IS_CLOSED;
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
552 599
553 rtc::SocketAddress remote_address; 600 rtc::SocketAddress remote_address;
554 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { 601 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
555 // Always expect correct IPv4 address to be allocated. 602 // Always expect correct IPv4 address to be allocated.
556 NOTREACHED(); 603 NOTREACHED();
557 } 604 }
558 socket->InitAcceptedTcp(client, local_address_, remote_address); 605 socket->InitAcceptedTcp(client, local_address_, remote_address);
559 SignalNewConnection(this, socket.release()); 606 SignalNewConnection(this, socket.release());
560 } 607 }
561 608
562 void IpcPacketSocket::OnSendComplete() { 609 // For most of the case, the completion signal should be returned for the packet
610 // at the beginning of |in_flight_packet_records_| so the perf impact should be
611 // negligible.
612 IpcPacketSocket::InFlightPacketList::iterator
613 IpcPacketSocket::FindInFlightRecord(uint64 packet_id) {
614 CHECK(!in_flight_packet_records_.empty());
615 InFlightPacketList::iterator it = in_flight_packet_records_.begin();
616 for (; it != in_flight_packet_records_.end(); ++it) {
617 if (it->packet_id == packet_id) {
618 break;
619 }
620 }
621
622 // We should always be able to find matching packet_id in the
623 // in_flight_packet_records_.
624 DCHECK(it != in_flight_packet_records_.end());
625 return it;
626 }
627
628 void IpcPacketSocket::OnSendComplete(const P2PSendPacketMetrics& send_metrics) {
563 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 629 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
564 630
565 CHECK(!in_flight_packet_sizes_.empty()); 631 CHECK(!in_flight_packet_records_.empty());
566 send_bytes_available_ += in_flight_packet_sizes_.front(); 632
633 InFlightPacketList::iterator it;
634 if (send_metrics.packet_id == 0) {
635 // If send_metrics doesn't carry valid packet_id, it means that we should
636 // not try to detect mismatch packets.
637 it = in_flight_packet_records_.begin();
638 } else {
639 it = FindInFlightRecord(send_metrics.packet_id);
640
641 // If we can't find the record, something has gone very wrong at this point.
642 if (it == in_flight_packet_records_.end()) {
643 VLOG(1) << "Failed to find in-flight record with packet_id = "
644 << send_metrics.packet_id;
645 NOTREACHED();
646 return;
647 }
648 }
649
650 send_bytes_available_ += it->packet_size;
651
652 // Here is a heuristic to detect a completion signal is not returned. We can't
653 // rely on the destructor since if a user closes the tab, the destructor is
654 // not invoked. Or if a user inputs another url, the destructor will be called
655 // before all completion signals return.
656 if (send_metrics.packet_id != 0 &&
657 in_flight_packet_records_.front().packet_id + 100 <
658 send_metrics.packet_id) {
659 // Report an instance that a packet didn't have its completion
660 // signal returned.
661 UMA_HISTOGRAM_BOOLEAN("WebRTC.ApplicationSocketMismatchPacketsDetected_UDP",
662 true);
663 }
567 664
568 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes); 665 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
569 666
570 in_flight_packet_sizes_.pop_front(); 667 in_flight_packet_records_.erase(it);
571 TraceSendThrottlingState(); 668 TraceSendThrottlingState();
572 669
573 if (writable_signal_expected_ && send_bytes_available_ > 0) { 670 if (writable_signal_expected_ && send_bytes_available_ > 0) {
574 WebRtcLogMessage(base::StringPrintf( 671 WebRtcLogMessage(base::StringPrintf(
575 "IpcPacketSocket: sending is unblocked. %d packets in flight.", 672 "IpcPacketSocket: sending is unblocked. %d packets in flight.",
576 static_cast<int>(in_flight_packet_sizes_.size()))); 673 static_cast<int>(in_flight_packet_records_.size())));
577 674
578 SignalReadyToSend(this); 675 SignalReadyToSend(this);
579 writable_signal_expected_ = false; 676 writable_signal_expected_ = false;
580 } 677 }
581 } 678 }
582 679
583 void IpcPacketSocket::OnError() { 680 void IpcPacketSocket::OnError() {
584 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 681 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
585 bool was_closed = (state_ == IS_ERROR || state_ == IS_CLOSED); 682 bool was_closed = (state_ == IS_ERROR || state_ == IS_CLOSED);
586 state_ = IS_ERROR; 683 state_ = IS_ERROR;
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after
740 } 837 }
741 838
742 rtc::AsyncResolverInterface* 839 rtc::AsyncResolverInterface*
743 IpcPacketSocketFactory::CreateAsyncResolver() { 840 IpcPacketSocketFactory::CreateAsyncResolver() {
744 scoped_ptr<AsyncAddressResolverImpl> resolver( 841 scoped_ptr<AsyncAddressResolverImpl> resolver(
745 new AsyncAddressResolverImpl(socket_dispatcher_)); 842 new AsyncAddressResolverImpl(socket_dispatcher_));
746 return resolver.release(); 843 return resolver.release();
747 } 844 }
748 845
749 } // namespace content 846 } // namespace content
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698