Chromium Code Reviews| Index: webrtc/p2p/stunprober/stunprober.cc |
| diff --git a/webrtc/p2p/stunprober/stunprober.cc b/webrtc/p2p/stunprober/stunprober.cc |
| index 8efe069b0b52f1841e59978a24023d087415e881..b9d8fb880234abdcc3d068c3827173d626e291ab 100644 |
| --- a/webrtc/p2p/stunprober/stunprober.cc |
| +++ b/webrtc/p2p/stunprober/stunprober.cc |
| @@ -13,10 +13,15 @@ |
| #include <set> |
| #include <string> |
| +#include "webrtc/base/asyncpacketsocket.h" |
| +#include "webrtc/base/asyncresolverinterface.h" |
| #include "webrtc/base/bind.h" |
| #include "webrtc/base/checks.h" |
| #include "webrtc/base/helpers.h" |
| +#include "webrtc/base/logging.h" |
| #include "webrtc/base/timeutils.h" |
| +#include "webrtc/base/thread.h" |
| +#include "webrtc/p2p/base/packetsocketfactory.h" |
| #include "webrtc/p2p/base/stun.h" |
| #include "webrtc/p2p/stunprober/stunprober.h" |
| @@ -29,15 +34,11 @@ void IncrementCounterByAddress(std::map<T, int>* counter_per_ip, const T& ip) { |
| counter_per_ip->insert(std::make_pair(ip, 0)).first->second++; |
| } |
| -bool behind_nat(NatType nat_type) { |
| - return nat_type > stunprober::NATTYPE_NONE; |
| -} |
| - |
| } // namespace |
| // A requester tracks the requests and responses from a single socket to many |
| // STUN servers |
| -class StunProber::Requester { |
| +class StunProber::Requester : public sigslot::has_slots<> { |
| public: |
| // Each Request maps to a request and response. |
| struct Request { |
| @@ -46,26 +47,20 @@ class StunProber::Requester { |
| // Time the response was received. |
| int64 received_time_ms = 0; |
| - // See whether the observed address returned matches the |
| - // local address as in StunProber.local_addr_. |
| - bool behind_nat = false; |
| - |
| // Server reflexive address from STUN response for this given request. |
| rtc::SocketAddress srflx_addr; |
| rtc::IPAddress server_addr; |
| int64 rtt() { return received_time_ms - sent_time_ms; } |
| - void ProcessResponse(rtc::ByteBuffer* message, |
| - int buf_len, |
| - const rtc::IPAddress& local_addr); |
| + void ProcessResponse(const char* buf, size_t buf_len); |
| }; |
| // StunProber provides |server_ips| for Requester to probe. For shared |
| // socket mode, it'll be all the resolved IP addresses. For non-shared mode, |
| // it'll just be a single address. |
| Requester(StunProber* prober, |
| - ServerSocketInterface* socket, |
| + rtc::AsyncPacketSocket* socket, |
| const std::vector<rtc::SocketAddress>& server_ips); |
| virtual ~Requester(); |
| @@ -74,11 +69,11 @@ class StunProber::Requester { |
| // and move to the next one. |
| void SendStunRequest(); |
| - void ReadStunResponse(); |
| - |
| - // |result| is the positive return value from RecvFrom when data is |
| - // available. |
| - void OnStunResponseReceived(int result); |
| + void OnStunResponseReceived(rtc::AsyncPacketSocket* socket, |
| + const char* buf, |
| + size_t size, |
| + const rtc::SocketAddress& addr, |
| + const rtc::PacketTime& time); |
| const std::vector<Request*>& requests() { return requests_; } |
| @@ -93,7 +88,7 @@ class StunProber::Requester { |
| StunProber* prober_; |
| // The socket for this session. |
| - rtc::scoped_ptr<ServerSocketInterface> socket_; |
| + rtc::scoped_ptr<rtc::AsyncPacketSocket> socket_; |
| // Temporary SocketAddress and buffer for RecvFrom. |
| rtc::SocketAddress addr_; |
| @@ -111,13 +106,15 @@ class StunProber::Requester { |
| StunProber::Requester::Requester( |
| StunProber* prober, |
| - ServerSocketInterface* socket, |
| + rtc::AsyncPacketSocket* socket, |
| const std::vector<rtc::SocketAddress>& server_ips) |
| : prober_(prober), |
| socket_(socket), |
| response_packet_(new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)), |
| server_ips_(server_ips), |
| thread_checker_(prober->thread_checker_) { |
| + socket_->SignalReadPacket.connect( |
| + this, &StunProber::Requester::OnStunResponseReceived); |
| } |
| StunProber::Requester::~Requester() { |
| @@ -145,7 +142,7 @@ void StunProber::Requester::SendStunRequest() { |
| rtc::scoped_ptr<rtc::ByteBuffer> request_packet( |
| new rtc::ByteBuffer(nullptr, kMaxUdpBufferSize)); |
| if (!message.Write(request_packet.get())) { |
| - prober_->End(WRITE_FAILED, 0); |
| + prober_->End(WRITE_FAILED); |
| return; |
| } |
| @@ -155,48 +152,26 @@ void StunProber::Requester::SendStunRequest() { |
| // The write must succeed immediately. Otherwise, the calculating of the STUN |
| // request timing could become too complicated. Callback is ignored by passing |
| // empty AsyncCallback. |
| - int rv = socket_->SendTo(addr, const_cast<char*>(request_packet->Data()), |
| - request_packet->Length(), AsyncCallback()); |
| + rtc::PacketOptions options; |
| + int rv = socket_->SendTo(const_cast<char*>(request_packet->Data()), |
| + request_packet->Length(), addr, options); |
| if (rv < 0) { |
| - prober_->End(WRITE_FAILED, rv); |
| + prober_->End(WRITE_FAILED); |
| return; |
| } |
| request.sent_time_ms = rtc::Time(); |
| - // Post a read waiting for response. For share mode, the subsequent read will |
| - // be posted inside OnStunResponseReceived. |
| - if (num_request_sent_ == 0) { |
| - ReadStunResponse(); |
| - } |
| - |
| num_request_sent_++; |
| DCHECK(static_cast<size_t>(num_request_sent_) <= server_ips_.size()); |
| } |
| -void StunProber::Requester::ReadStunResponse() { |
| - DCHECK(thread_checker_.CalledOnValidThread()); |
| - if (!socket_) { |
| - return; |
| - } |
| - |
| - int rv = socket_->RecvFrom( |
| - response_packet_->ReserveWriteBuffer(kMaxUdpBufferSize), |
| - kMaxUdpBufferSize, &addr_, |
| - [this](int result) { this->OnStunResponseReceived(result); }); |
| - if (rv != SocketInterface::IO_PENDING) { |
| - OnStunResponseReceived(rv); |
| - } |
| -} |
| - |
| -void StunProber::Requester::Request::ProcessResponse( |
| - rtc::ByteBuffer* message, |
| - int buf_len, |
| - const rtc::IPAddress& local_addr) { |
| +void StunProber::Requester::Request::ProcessResponse(const char* buf, |
| + size_t buf_len) { |
| int64 now = rtc::Time(); |
| - |
| + rtc::ByteBuffer message(buf, buf_len); |
| cricket::StunMessage stun_response; |
| - if (!stun_response.Read(message)) { |
| + if (!stun_response.Read(&message)) { |
| // Invalid or incomplete STUN packet. |
| received_time_ms = 0; |
| return; |
| @@ -218,40 +193,25 @@ void StunProber::Requester::Request::ProcessResponse( |
| received_time_ms = now; |
| srflx_addr = addr_attr->GetAddress(); |
| - |
| - // Calculate behind_nat. |
| - behind_nat = (srflx_addr.ipaddr() != local_addr); |
| } |
| -void StunProber::Requester::OnStunResponseReceived(int result) { |
| +void StunProber::Requester::OnStunResponseReceived( |
| + rtc::AsyncPacketSocket* socket, |
| + const char* buf, |
| + size_t size, |
| + const rtc::SocketAddress& addr, |
| + const rtc::PacketTime& time) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| DCHECK(socket_); |
| - |
| - if (result < 0) { |
| - // Something is wrong, finish the test. |
| - prober_->End(READ_FAILED, result); |
| - return; |
| - } |
| - |
| - Request* request = GetRequestByAddress(addr_.ipaddr()); |
| + Request* request = GetRequestByAddress(addr.ipaddr()); |
| if (!request) { |
| // Something is wrong, finish the test. |
| - prober_->End(GENERIC_FAILURE, result); |
| + prober_->End(GENERIC_FAILURE); |
| return; |
| } |
| num_response_received_++; |
| - |
| - // Resize will set the end_ to indicate that there are data available in this |
| - // ByteBuffer. |
| - response_packet_->Resize(result); |
| - request->ProcessResponse(response_packet_.get(), result, |
| - prober_->local_addr_); |
| - |
| - if (static_cast<size_t>(num_response_received_) < server_ips_.size()) { |
| - // Post another read response. |
| - ReadStunResponse(); |
| - } |
| + request->ProcessResponse(buf, size); |
| } |
| StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress( |
| @@ -266,13 +226,13 @@ StunProber::Requester::Request* StunProber::Requester::GetRequestByAddress( |
| return nullptr; |
| } |
| -StunProber::StunProber(HostNameResolverInterface* host_name_resolver, |
| - SocketFactoryInterface* socket_factory, |
| - TaskRunnerInterface* task_runner) |
| +StunProber::StunProber(rtc::PacketSocketFactory* socket_factory, |
| + rtc::Thread* thread, |
| + const rtc::NetworkManager::NetworkList& networks) |
| : interval_ms_(0), |
| socket_factory_(socket_factory), |
| - resolver_(host_name_resolver), |
| - task_runner_(task_runner) { |
| + thread_(thread), |
| + networks_(networks) { |
| } |
| StunProber::~StunProber() { |
| @@ -281,6 +241,11 @@ StunProber::~StunProber() { |
| delete req; |
| } |
| } |
| + for (auto s : sockets_) { |
| + if (s) { |
| + delete s; |
| + } |
| + } |
| } |
| bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers, |
| @@ -301,92 +266,98 @@ bool StunProber::Start(const std::vector<rtc::SocketAddress>& servers, |
| timeout_ms_ = timeout_ms; |
| servers_ = servers; |
| finished_callback_ = callback; |
| - resolver_->Resolve(servers_[0], &resolved_ips_, |
| - [this](int result) { this->OnServerResolved(0, result); }); |
| + return ResolveServerName(servers_.back()); |
| +} |
| + |
| +bool StunProber::ResolveServerName(const rtc::SocketAddress& addr) { |
| + rtc::AsyncResolverInterface* resolver = |
| + socket_factory_->CreateAsyncResolver(); |
| + if (!resolver) { |
| + return false; |
| + } |
| + resolver->SignalDone.connect(this, &StunProber::OnServerResolved); |
| + resolver->Start(addr); |
| return true; |
| } |
| -void StunProber::OnServerResolved(int index, int result) { |
| +void StunProber::OnSocketReady(rtc::AsyncPacketSocket* socket, |
| + const rtc::SocketAddress& addr) { |
| + total_ready_sockets_++; |
| + if (total_ready_sockets_ == total_socket_required()) { |
| + MaybeScheduleStunRequests(); |
| + } |
|
pthatcher1
2015/06/18 20:26:59
What if not all of the sockets get ready? Do we s
guoweis_left_chromium
2015/06/18 20:39:38
yes.
|
| +} |
| + |
| +void StunProber::OnServerResolved(rtc::AsyncResolverInterface* resolver) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| - if (result == 0) { |
| - all_servers_ips_.insert(all_servers_ips_.end(), resolved_ips_.begin(), |
| - resolved_ips_.end()); |
| - resolved_ips_.clear(); |
| + if (resolver->GetError() == 0) { |
| + rtc::SocketAddress addr(resolver->address().ipaddr(), |
| + resolver->address().port()); |
| + all_servers_addrs_.push_back(addr); |
| } |
| - index++; |
| + // Deletion of AsyncResolverInterface can't be done in OnResolveResult which |
| + // handles SignalDone. |
| + invoker_.AsyncInvoke<void>( |
| + thread_, |
| + rtc::Bind(&rtc::AsyncResolverInterface::Destroy, resolver, false)); |
| + servers_.pop_back(); |
| - if (static_cast<size_t>(index) < servers_.size()) { |
| - resolver_->Resolve( |
| - servers_[index], &resolved_ips_, |
| - [this, index](int result) { this->OnServerResolved(index, result); }); |
| + if (servers_.size()) { |
| + if (!ResolveServerName(servers_.back())) { |
| + End(RESOLVE_FAILED); |
| + } |
| return; |
| } |
| - if (all_servers_ips_.size() == 0) { |
| - End(RESOLVE_FAILED, result); |
| + if (all_servers_addrs_.size() == 0) { |
| + End(RESOLVE_FAILED); |
| return; |
| } |
| // Dedupe. |
| - std::set<rtc::SocketAddress> addrs(all_servers_ips_.begin(), |
| - all_servers_ips_.end()); |
| - all_servers_ips_.assign(addrs.begin(), addrs.end()); |
| - |
| - rtc::IPAddress addr; |
| - if (GetLocalAddress(&addr) != 0) { |
| - End(GENERIC_FAILURE, result); |
| - return; |
| - } |
| - |
| - socket_factory_->Prepare(GetTotalClientSockets(), GetTotalServerSockets(), |
| - [this](int result) { |
| - if (result == 0) { |
| - this->MaybeScheduleStunRequests(); |
| - } |
| - }); |
| -} |
| - |
| -int StunProber::GetLocalAddress(rtc::IPAddress* addr) { |
| - DCHECK(thread_checker_.CalledOnValidThread()); |
| - if (local_addr_.family() == AF_UNSPEC) { |
| - rtc::SocketAddress sock_addr; |
| - rtc::scoped_ptr<ClientSocketInterface> socket( |
| - socket_factory_->CreateClientSocket()); |
| - int rv = socket->Connect(all_servers_ips_[0]); |
| - if (rv != SUCCESS) { |
| - End(GENERIC_FAILURE, rv); |
| - return rv; |
| + std::set<rtc::SocketAddress> addrs(all_servers_addrs_.begin(), |
| + all_servers_addrs_.end()); |
| + all_servers_addrs_.assign(addrs.begin(), addrs.end()); |
| + |
| + // Prepare all the sockets beforehand. All of them will bind to "any" address. |
| + while (sockets_.size() < total_socket_required()) { |
| + rtc::scoped_ptr<rtc::AsyncPacketSocket> socket( |
| + socket_factory_->CreateUdpSocket(rtc::SocketAddress(INADDR_ANY, 0), 0, |
| + 0)); |
| + if (!socket) { |
| + End(GENERIC_FAILURE); |
| + return; |
| } |
| - rv = socket->GetLocalAddress(&sock_addr); |
| - if (rv != SUCCESS) { |
| - End(GENERIC_FAILURE, rv); |
| - return rv; |
| + // Chrome and WebRTC behave differently in terms of the state of a socket |
| + // once returned from PacketSocketFactory::CreateUdpSocket. |
| + if (socket->GetState() == rtc::AsyncPacketSocket::STATE_BINDING) { |
| + socket->SignalAddressReady.connect(this, &StunProber::OnSocketReady); |
| + } else { |
| + OnSocketReady(socket.get(), rtc::SocketAddress(INADDR_ANY, 0)); |
| } |
| - local_addr_ = sock_addr.ipaddr(); |
| - socket->Close(); |
| + sockets_.push_back(socket.release()); |
| } |
| - *addr = local_addr_; |
| - return 0; |
| } |
| StunProber::Requester* StunProber::CreateRequester() { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| - rtc::scoped_ptr<ServerSocketInterface> socket( |
| - socket_factory_->CreateServerSocket(kMaxUdpBufferSize, |
| - kMaxUdpBufferSize)); |
| - if (!socket) { |
| + if (!sockets_.size()) { |
| return nullptr; |
| } |
| + StunProber::Requester* requester; |
| if (shared_socket_mode_) { |
| - return new Requester(this, socket.release(), all_servers_ips_); |
| + requester = new Requester(this, sockets_.back(), all_servers_addrs_); |
| } else { |
| std::vector<rtc::SocketAddress> server_ip; |
| server_ip.push_back( |
| - all_servers_ips_[(num_request_sent_ % all_servers_ips_.size())]); |
| - return new Requester(this, socket.release(), server_ip); |
| + all_servers_addrs_[(num_request_sent_ % all_servers_addrs_.size())]); |
| + requester = new Requester(this, sockets_.back(), server_ip); |
| } |
| + |
| + sockets_.pop_back(); |
|
pthatcher1
2015/06/18 20:26:59
Does the Requester take ownership of the socket an
guoweis_left_chromium
2015/06/18 20:39:38
yes.
|
| + return requester; |
| } |
| bool StunProber::SendNextRequest() { |
| @@ -407,19 +378,20 @@ void StunProber::MaybeScheduleStunRequests() { |
| uint32 now = rtc::Time(); |
| if (Done()) { |
| - task_runner_->PostTask(rtc::Bind(&StunProber::End, this, SUCCESS, 0), |
| - timeout_ms_); |
| + invoker_.AsyncInvokeDelayed<void>( |
| + thread_, rtc::Bind(&StunProber::End, this, SUCCESS), timeout_ms_); |
| return; |
| } |
| if (now >= next_request_time_ms_) { |
| if (!SendNextRequest()) { |
| - End(GENERIC_FAILURE, 0); |
| + End(GENERIC_FAILURE); |
| return; |
| } |
| next_request_time_ms_ = now + interval_ms_; |
| } |
| - task_runner_->PostTask( |
| - rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), 1 /* ms */); |
| + invoker_.AsyncInvokeDelayed<void>( |
| + thread_, rtc::Bind(&StunProber::MaybeScheduleStunRequests, this), |
| + 1 /* ms */); |
|
Sergey Ulanov
2015/06/12 19:28:27
Why does the task need to be scheduled every 1 ms?
guoweis_left_chromium
2015/06/18 20:39:38
changed to 5ms.
|
| } |
| bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
| @@ -464,14 +436,7 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
| IncrementCounterByAddress(&num_response_per_server, request->server_addr); |
| IncrementCounterByAddress(&num_response_per_srflx_addr, |
| request->srflx_addr); |
| - |
| rtt_sum += request->rtt(); |
| - if (nat_type == NATTYPE_INVALID) { |
| - nat_type = request->behind_nat ? NATTYPE_UNKNOWN : NATTYPE_NONE; |
| - } else if (behind_nat(nat_type) != request->behind_nat) { |
| - // Detect the inconsistency in NAT presence. |
| - return false; |
| - } |
| stats.srflx_addrs.insert(request->srflx_addr.ToString()); |
| srflx_ips.insert(request->srflx_addr.ipaddr()); |
| } |
| @@ -505,18 +470,34 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
| return false; |
| } |
| - stats.nat_type = nat_type; |
| - |
| // Shared mode is only true if we use the shared socket and there are more |
| // than 1 responding servers. |
| stats.shared_socket_mode = |
| shared_socket_mode_ && (num_server_ip_with_response > 1); |
| - if (stats.shared_socket_mode && nat_type == NATTYPE_UNKNOWN) { |
| - stats.nat_type = NATTYPE_NON_SYMMETRIC; |
| + if (stats.shared_socket_mode && nat_type == NATTYPE_INVALID) { |
| + nat_type = NATTYPE_NON_SYMMETRIC; |
| } |
| - stats.host_ip = local_addr_.ToString(); |
| + // If we could find a local IP matching srflx, we're not behind a NAT. |
| + rtc::SocketAddress srflx_addr; |
| + if (!srflx_addr.FromString(*(stats.srflx_addrs.begin()))) { |
| + return false; |
| + } |
| + for (const auto& net : networks_) { |
| + if (srflx_addr.ipaddr() == net->GetBestIP()) { |
| + nat_type = stunprober::NATTYPE_NONE; |
| + stats.host_ip = net->GetBestIP().ToString(); |
| + break; |
| + } |
| + } |
| + |
| + // Finally, we know we're behind a NAT but can't determine which type it is. |
| + if (nat_type == NATTYPE_INVALID) { |
| + nat_type = NATTYPE_UNKNOWN; |
| + } |
| + |
| + stats.nat_type = nat_type; |
| stats.num_request_sent = num_sent; |
| stats.num_response_received = num_received; |
| stats.target_request_interval_ns = interval_ms_ * 1000; |
| @@ -538,14 +519,14 @@ bool StunProber::GetStats(StunProber::Stats* prob_stats) const { |
| return true; |
| } |
| -void StunProber::End(StunProber::Status status, int result) { |
| +void StunProber::End(StunProber::Status status) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| if (!finished_callback_.empty()) { |
| AsyncCallback callback = finished_callback_; |
| finished_callback_ = AsyncCallback(); |
| // Callback at the last since the prober might be deleted in the callback. |
| - callback(status); |
| + callback(this, status); |
| } |
| } |