Chromium Code Reviews| Index: net/tools/quic/quic_simple_client.cc |
| diff --git a/net/tools/quic/quic_simple_client.cc b/net/tools/quic/quic_simple_client.cc |
| index dccec95aa663d4d6a31574f80075c4cf455dda97..aef81a9a25fe00b590737b9915f34fd997e6b4d8 100644 |
| --- a/net/tools/quic/quic_simple_client.cc |
| +++ b/net/tools/quic/quic_simple_client.cc |
| @@ -2,154 +2,99 @@ |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| -#include "net/tools/quic/quic_client.h" |
| - |
| -#include <errno.h> |
| -#include <netinet/in.h> |
| -#include <string.h> |
| -#include <sys/socket.h> |
| -#include <unistd.h> |
| +#include "net/tools/quic/quic_simple_client.h" |
| #include "base/logging.h" |
| +#include "base/run_loop.h" |
| +#include "net/base/net_errors.h" |
| +#include "net/http/http_request_info.h" |
| #include "net/quic/crypto/quic_random.h" |
| #include "net/quic/quic_connection.h" |
| -#include "net/quic/quic_data_reader.h" |
| +#include "net/quic/quic_connection_helper.h" |
| +#include "net/quic/quic_default_packet_writer.h" |
| #include "net/quic/quic_protocol.h" |
| #include "net/quic/quic_server_id.h" |
| -#include "net/tools/balsa/balsa_headers.h" |
| -#include "net/tools/epoll_server/epoll_server.h" |
| -#include "net/tools/quic/quic_epoll_connection_helper.h" |
| -#include "net/tools/quic/quic_socket_utils.h" |
| -#include "net/tools/quic/quic_spdy_client_stream.h" |
| - |
| -#ifndef SO_RXQ_OVFL |
| -#define SO_RXQ_OVFL 40 |
| -#endif |
| - |
| -using std::string; |
| +#include "net/udp/udp_client_socket.h" |
| namespace net { |
| namespace tools { |
| +namespace { |
| -const PollBits kEpollFlags = PollBits(NET_POLLIN | NET_POLLOUT | NET_POLLET); |
| +// Allocate some extra space so we can send an error if the server goes over |
| +// the limit. |
| +const int kReadBufferSize = 2 * kMaxPacketSize; |
| -QuicClient::QuicClient(IPEndPoint server_address, |
| - const QuicServerId& server_id, |
| - const QuicVersionVector& supported_versions, |
| - EpollServer* epoll_server) |
| +} // namespace |
|
ramant (doing other things)
2015/03/18 05:21:36
nit: an extra space before "//".
Ryan Hamilton
2015/03/18 22:35:57
Done.
|
| + |
| +QuicSimpleClient::QuicSimpleClient(IPEndPoint server_address, |
| + const QuicServerId& server_id, |
| + const QuicVersionVector& supported_versions) |
| : server_address_(server_address), |
| server_id_(server_id), |
| local_port_(0), |
| - epoll_server_(epoll_server), |
| - fd_(-1), |
| helper_(CreateQuicConnectionHelper()), |
| initialized_(false), |
| - packets_dropped_(0), |
| - overflow_supported_(false), |
| supported_versions_(supported_versions), |
| - store_response_(false), |
| - latest_response_code_(-1) { |
| + read_pending_(false), |
| + synchronous_read_count_(0), |
| + read_buffer_(new IOBufferWithSize(kReadBufferSize)), |
| + weak_factory_(this) { |
| } |
| -QuicClient::QuicClient(IPEndPoint server_address, |
| - const QuicServerId& server_id, |
| - const QuicVersionVector& supported_versions, |
| - const QuicConfig& config, |
| - EpollServer* epoll_server) |
| +QuicSimpleClient::QuicSimpleClient(IPEndPoint server_address, |
| + const QuicServerId& server_id, |
| + const QuicVersionVector& supported_versions, |
| + const QuicConfig& config) |
| : server_address_(server_address), |
| server_id_(server_id), |
| config_(config), |
| local_port_(0), |
| - epoll_server_(epoll_server), |
| - fd_(-1), |
| helper_(CreateQuicConnectionHelper()), |
| initialized_(false), |
| - packets_dropped_(0), |
| - overflow_supported_(false), |
| supported_versions_(supported_versions), |
| - store_response_(false), |
| - latest_response_code_(-1) { |
| + read_pending_(false), |
| + synchronous_read_count_(0), |
| + read_buffer_(new IOBufferWithSize(kReadBufferSize)), |
| + weak_factory_(this) { |
| } |
| -QuicClient::~QuicClient() { |
| +QuicSimpleClient::~QuicSimpleClient() { |
| if (connected()) { |
| session()->connection()->SendConnectionClosePacket( |
| QUIC_PEER_GOING_AWAY, ""); |
| } |
| - |
| - CleanUpUDPSocket(); |
| } |
| -bool QuicClient::Initialize() { |
| +bool QuicSimpleClient::Initialize() { |
| DCHECK(!initialized_); |
| - // If an initial flow control window has not explicitly been set, then use the |
| - // same value that Chrome uses: 10 Mb. |
| - const uint32 kInitialFlowControlWindow = 10 * 1024 * 1024; // 10 Mb |
| - if (config_.GetInitialStreamFlowControlWindowToSend() == |
| - kMinimumFlowControlSendWindow) { |
| - config_.SetInitialStreamFlowControlWindowToSend(kInitialFlowControlWindow); |
| - } |
| - if (config_.GetInitialSessionFlowControlWindowToSend() == |
| - kMinimumFlowControlSendWindow) { |
| - config_.SetInitialSessionFlowControlWindowToSend(kInitialFlowControlWindow); |
| - } |
| - |
| - epoll_server_->set_timeout_in_us(50 * 1000); |
| - |
| if (!CreateUDPSocket()) { |
| return false; |
| } |
| - epoll_server_->RegisterFD(fd_, this, kEpollFlags); |
| initialized_ = true; |
| return true; |
| } |
| -QuicClient::DummyPacketWriterFactory::DummyPacketWriterFactory( |
| +QuicSimpleClient::DummyPacketWriterFactory::DummyPacketWriterFactory( |
| QuicPacketWriter* writer) |
| : writer_(writer) {} |
| -QuicClient::DummyPacketWriterFactory::~DummyPacketWriterFactory() {} |
| +QuicSimpleClient::DummyPacketWriterFactory::~DummyPacketWriterFactory() {} |
| -QuicPacketWriter* QuicClient::DummyPacketWriterFactory::Create( |
| +QuicPacketWriter* QuicSimpleClient::DummyPacketWriterFactory::Create( |
| QuicConnection* /*connection*/) const { |
| return writer_; |
| } |
| +bool QuicSimpleClient::CreateUDPSocket() { |
| + scoped_ptr<UDPClientSocket> socket( |
| + new UDPClientSocket(DatagramSocket::DEFAULT_BIND, |
| + RandIntCallback(), |
| + &net_log_, |
| + NetLog::Source())); |
| -bool QuicClient::CreateUDPSocket() { |
| int address_family = server_address_.GetSockAddrFamily(); |
| - fd_ = QuicSocketUtils::CreateNonBlockingSocket(address_family, SOCK_DGRAM, |
| - IPPROTO_UDP); |
| - if (fd_ < 0) { |
| - return false; // failure already logged |
| - } |
| - |
| - int get_overflow = 1; |
| - int rc = setsockopt(fd_, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, |
| - sizeof(get_overflow)); |
| - if (rc < 0) { |
| - DLOG(WARNING) << "Socket overflow detection not supported"; |
| - } else { |
| - overflow_supported_ = true; |
| - } |
| - |
| - if (!QuicSocketUtils::SetReceiveBufferSize(fd_, |
| - kDefaultSocketReceiveBuffer)) { |
| - return false; |
| - } |
| - |
| - if (!QuicSocketUtils::SetSendBufferSize(fd_, kDefaultSocketReceiveBuffer)) { |
| - return false; |
| - } |
| - |
| - rc = QuicSocketUtils::SetGetAddressInfo(fd_, address_family); |
| - if (rc < 0) { |
| - LOG(ERROR) << "IP detection not supported" << strerror(errno); |
| - return false; |
| - } |
| - |
| if (bind_to_address_.size() != 0) { |
| client_address_ = IPEndPoint(bind_to_address_, local_port_); |
| } else if (address_family == AF_INET) { |
| @@ -162,243 +107,244 @@ bool QuicClient::CreateUDPSocket() { |
| client_address_ = IPEndPoint(any6, local_port_); |
| } |
| - sockaddr_storage raw_addr; |
| - socklen_t raw_addr_len = sizeof(raw_addr); |
| - CHECK(client_address_.ToSockAddr(reinterpret_cast<sockaddr*>(&raw_addr), |
| - &raw_addr_len)); |
| - rc = bind(fd_, |
| - reinterpret_cast<const sockaddr*>(&raw_addr), |
| - sizeof(raw_addr)); |
| - if (rc < 0) { |
| - LOG(ERROR) << "Bind failed: " << strerror(errno); |
| + int rc = socket->Connect(server_address_); |
| + if (rc != OK) { |
| + LOG(ERROR) << "Connect failed: " << ErrorToString(rc); |
| + return false; |
| + } |
| + |
| + rc = socket->SetReceiveBufferSize(kDefaultSocketReceiveBuffer); |
| + if (rc != OK) { |
| + LOG(ERROR) << "SetReceiveBufferSize() failed: " << ErrorToString(rc); |
| return false; |
| } |
| - SockaddrStorage storage; |
| - if (getsockname(fd_, storage.addr, &storage.addr_len) != 0 || |
| - !client_address_.FromSockAddr(storage.addr, storage.addr_len)) { |
| - LOG(ERROR) << "Unable to get self address. Error: " << strerror(errno); |
| + rc = socket->SetSendBufferSize(kDefaultSocketReceiveBuffer); |
| + if (rc != OK) { |
| + LOG(ERROR) << "SetSendBufferSize() failed: " << ErrorToString(rc); |
| + return false; |
| + } |
| + |
| + rc = socket->GetLocalAddress(&client_address_); |
| + if (rc != OK) { |
| + LOG(ERROR) << "GetLocalAddress failed: " << ErrorToString(rc); |
| + return false; |
| + } |
| + |
| + socket_.swap(socket); |
| + |
| + read_pending_ = false; |
| + |
| + if (socket != NULL) { |
| + // Close old socket |
|
ramant (doing other things)
2015/03/18 05:21:36
overly nit: consider punctuation. Period at the en
Ryan Hamilton
2015/03/18 22:35:57
Actually, nuking the comment completely.
|
| + socket->Close(); |
| } |
| return true; |
| } |
| -bool QuicClient::Connect() { |
| +bool QuicSimpleClient::Connect() { |
| StartConnect(); |
| + StartReading(); |
| while (EncryptionBeingEstablished()) { |
| WaitForEvents(); |
| } |
| return session_->connection()->connected(); |
| } |
| -void QuicClient::StartConnect() { |
| +void QuicSimpleClient::StartConnect() { |
| DCHECK(initialized_); |
| DCHECK(!connected()); |
| - QuicPacketWriter* writer = CreateQuicPacketWriter(); |
| - |
| - DummyPacketWriterFactory factory(writer); |
| - |
| - session_.reset(new QuicClientSession( |
| - config_, |
| - new QuicConnection(GenerateConnectionId(), server_address_, helper_.get(), |
| - factory, |
| - /* owns_writer= */ false, Perspective::IS_CLIENT, |
| - server_id_.is_https(), supported_versions_))); |
| - |
| - // Reset |writer_| after |session_| so that the old writer outlives the old |
| - // session. |
| - if (writer_.get() != writer) { |
| - writer_.reset(writer); |
| - } |
| + writer_.reset(CreateQuicPacketWriter()); |
| + connection_ = new QuicConnection(GenerateConnectionId(), |
| + server_address_, |
| + helper_.get(), |
| + DummyPacketWriterFactory(writer_.get()), |
| + /* owns_writer= */ false, |
| + Perspective::IS_CLIENT, |
| + /* is_secure= */ false, |
|
ramant (doing other things)
2015/03/18 05:21:36
nit: should we consider adding a TODO to support s
Ryan Hamilton
2015/03/18 22:35:57
Whoop,s I think we already have support for this.
|
| + supported_versions_); |
| + session_.reset(new QuicSimpleClientSession(config_, connection_)); |
| session_->InitializeSession(server_id_, &crypto_config_); |
| session_->CryptoConnect(); |
| } |
| -bool QuicClient::EncryptionBeingEstablished() { |
| +bool QuicSimpleClient::EncryptionBeingEstablished() { |
| return !session_->IsEncryptionEstablished() && |
| - session_->connection()->connected(); |
| + session_->connection()->connected(); |
| } |
| -void QuicClient::Disconnect() { |
| +void QuicSimpleClient::Disconnect() { |
| DCHECK(initialized_); |
| if (connected()) { |
| session()->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY); |
| } |
| - CleanUpUDPSocket(); |
| + writer_.reset(); |
| - initialized_ = false; |
| -} |
| + read_pending_ = false; |
| -void QuicClient::CleanUpUDPSocket() { |
| - if (fd_ > -1) { |
| - epoll_server_->UnregisterFD(fd_); |
| - close(fd_); |
| - fd_ = -1; |
| - } |
| + initialized_ = false; |
| } |
| -void QuicClient::SendRequest(const BalsaHeaders& headers, |
| - StringPiece body, |
| - bool fin) { |
| - QuicSpdyClientStream* stream = CreateReliableClientStream(); |
| - if (stream == nullptr) { |
| - LOG(DFATAL) << "stream creation failed!"; |
| - return; |
| - } |
| - stream->SendRequest(headers, body, fin); |
| +void QuicSimpleClient::SendRequestAndWaitForResponse( |
| + const HttpRequestInfo& request, |
| + base::StringPiece body, |
| + bool fin) { |
| + QuicSimpleClientStream* stream = CreateReliableClientStream(); |
| + DCHECK(stream != NULL); |
| + stream->SendRequest(request, body, fin); |
| stream->set_visitor(this); |
| -} |
| -void QuicClient::SendRequestAndWaitForResponse(const BalsaHeaders& headers, |
| - StringPiece body, |
| - bool fin) { |
| - SendRequest(headers, "", true); |
| while (WaitForEvents()) { |
| } |
| } |
| -void QuicClient::SendRequestsAndWaitForResponse( |
| +void QuicSimpleClient::SendRequestsAndWaitForResponse( |
| const base::CommandLine::StringVector& args) { |
| for (size_t i = 0; i < args.size(); ++i) { |
| - BalsaHeaders headers; |
| - headers.SetRequestFirstlineFromStringPieces("GET", args[i], "HTTP/1.1"); |
| - SendRequest(headers, "", true); |
| + HttpRequestInfo request; |
| + request.method = "GET"; |
| + request.url = GURL(args[i]); |
| + QuicSimpleClientStream* stream = CreateReliableClientStream(); |
| + DCHECK(stream != NULL); |
| + stream->SendRequest(request, "", true); |
| + stream->set_visitor(this); |
| + } |
| + |
| + while (WaitForEvents()) { |
| } |
| - while (WaitForEvents()) {} |
| } |
| -QuicSpdyClientStream* QuicClient::CreateReliableClientStream() { |
| +QuicSimpleClientStream* QuicSimpleClient::CreateReliableClientStream() { |
| if (!connected()) { |
| - return nullptr; |
| + return NULL; |
|
ramant (doing other things)
2015/03/18 05:21:36
nit: consider nullptr to be consistent with rest o
Ryan Hamilton
2015/03/18 22:35:57
Done.
|
| } |
| return session_->CreateOutgoingDataStream(); |
| } |
| -void QuicClient::WaitForStreamToClose(QuicStreamId id) { |
| +void QuicSimpleClient::WaitForStreamToClose(QuicStreamId id) { |
| DCHECK(connected()); |
| while (connected() && !session_->IsClosedStream(id)) { |
| - epoll_server_->WaitForEventsAndExecuteCallbacks(); |
| + base::RunLoop().RunUntilIdle(); |
| } |
| } |
| -void QuicClient::WaitForCryptoHandshakeConfirmed() { |
| +void QuicSimpleClient::WaitForCryptoHandshakeConfirmed() { |
| DCHECK(connected()); |
| while (connected() && !session_->IsCryptoHandshakeConfirmed()) { |
| - epoll_server_->WaitForEventsAndExecuteCallbacks(); |
| + base::RunLoop().RunUntilIdle(); |
| } |
| } |
| -bool QuicClient::WaitForEvents() { |
| +bool QuicSimpleClient::WaitForEvents() { |
| DCHECK(connected()); |
| - epoll_server_->WaitForEventsAndExecuteCallbacks(); |
| + base::RunLoop().RunUntilIdle(); |
| return session_->num_active_requests() != 0; |
| } |
| -void QuicClient::OnEvent(int fd, EpollEvent* event) { |
| - DCHECK_EQ(fd, fd_); |
| +void QuicSimpleClient::StartReading() { |
| + if (read_pending_) { |
| + return; |
| + } |
| + read_pending_ = true; |
| + |
| + int result = socket_->Read( |
| + read_buffer_.get(), |
| + read_buffer_->size(), |
| + base::Bind(&QuicSimpleClient::OnReadComplete, base::Unretained(this))); |
|
ramant (doing other things)
2015/03/18 05:21:36
nit: I am not 100% sure what to use: "weak_factory
Ryan Hamilton
2015/03/18 22:35:57
Good catch.
|
| - if (event->in_events & NET_POLLIN) { |
| - while (connected() && ReadAndProcessPacket()) { |
| - } |
| + if (result == ERR_IO_PENDING) { |
| + synchronous_read_count_ = 0; |
| + return; |
| } |
| - if (connected() && (event->in_events & NET_POLLOUT)) { |
| - writer_->SetWritable(); |
| - session_->connection()->OnCanWrite(); |
| + |
| + if (++synchronous_read_count_ > 32) { |
| + synchronous_read_count_ = 0; |
| + // Schedule the processing through the message loop to 1) prevent infinite |
| + // recursion and 2) avoid blocking the thread for too long. |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&QuicSimpleClient::OnReadComplete, |
| + weak_factory_.GetWeakPtr(), result)); |
| + } else { |
| + OnReadComplete(result); |
| } |
| - if (event->in_events & NET_POLLERR) { |
| - DVLOG(1) << "NET_POLLERR"; |
| +} |
| + |
| +void QuicSimpleClient::OnReadComplete(int result) { |
| + read_pending_ = false; |
| + if (result == 0) |
| + result = ERR_CONNECTION_CLOSED; |
| + |
| + if (result < 0) { |
| + LOG(ERROR) << "QuicSimpleClient read failed: " << ErrorToString(result); |
| + Disconnect(); |
| + return; |
| } |
| + |
| + QuicEncryptedPacket packet(read_buffer_->data(), result, false); |
| + session_->connection()->ProcessUdpPacket( |
|
ramant (doing other things)
2015/03/18 05:21:36
nit: Should we worry about ProcessUdpPacket() dele
Ryan Hamilton
2015/03/18 22:35:57
No, that's not possible here. That's possible in c
|
| + client_address_, server_address_, packet); |
| + |
| + StartReading(); |
| } |
| -void QuicClient::OnClose(QuicDataStream* stream) { |
| - QuicSpdyClientStream* client_stream = |
| - static_cast<QuicSpdyClientStream*>(stream); |
| - if (response_listener_.get() != nullptr) { |
| +void QuicSimpleClient::OnClose(QuicDataStream* stream) { |
| + QuicSimpleClientStream* client_stream = |
| + static_cast<QuicSimpleClientStream*>(stream); |
| + if (response_listener_.get() != NULL) { |
|
ramant (doing other things)
2015/03/18 05:21:36
nit: NULL -> nullptr?
Ryan Hamilton
2015/03/18 22:35:57
Done.
|
| response_listener_->OnCompleteResponse( |
| - stream->id(), client_stream->headers(), client_stream->data()); |
| + stream->id(), *client_stream->headers(), client_stream->data()); |
| } |
| // Store response headers and body. |
| if (store_response_) { |
| - latest_response_code_ = client_stream->headers().parsed_response_code(); |
| - client_stream->headers().DumpHeadersToString(&latest_response_headers_); |
| + latest_response_code_ = client_stream->headers()->response_code(); |
| + client_stream->headers()->GetNormalizedHeaders(&latest_response_headers_); |
| latest_response_body_ = client_stream->data(); |
| } |
| } |
| -bool QuicClient::connected() const { |
| +bool QuicSimpleClient::connected() const { |
| return session_.get() && session_->connection() && |
| - session_->connection()->connected(); |
| -} |
| - |
| -bool QuicClient::goaway_received() const { |
| - return session_ != nullptr && session_->goaway_received(); |
| + session_->connection()->connected(); |
| } |
| -size_t QuicClient::latest_response_code() const { |
| +size_t QuicSimpleClient::latest_response_code() const { |
| LOG_IF(DFATAL, !store_response_) << "Response not stored!"; |
| return latest_response_code_; |
| } |
| -const string& QuicClient::latest_response_headers() const { |
| +const std::string& QuicSimpleClient::latest_response_headers() const { |
| LOG_IF(DFATAL, !store_response_) << "Response not stored!"; |
| return latest_response_headers_; |
| } |
| -const string& QuicClient::latest_response_body() const { |
| +const std::string& QuicSimpleClient::latest_response_body() const { |
| LOG_IF(DFATAL, !store_response_) << "Response not stored!"; |
| return latest_response_body_; |
| } |
| -QuicConnectionId QuicClient::GenerateConnectionId() { |
| - return QuicRandom::GetInstance()->RandUint64(); |
| -} |
| - |
| -QuicEpollConnectionHelper* QuicClient::CreateQuicConnectionHelper() { |
| - return new QuicEpollConnectionHelper(epoll_server_); |
| -} |
| - |
| -QuicPacketWriter* QuicClient::CreateQuicPacketWriter() { |
| - return new QuicDefaultPacketWriter(fd_); |
| +QuicConnectionId QuicSimpleClient::GenerateConnectionId() { |
| + return helper_->GetRandomGenerator()->RandUint64(); |
| } |
| -int QuicClient::ReadPacket(char* buffer, |
| - int buffer_len, |
| - IPEndPoint* server_address, |
| - IPAddressNumber* client_ip) { |
| - return QuicSocketUtils::ReadPacket( |
| - fd_, buffer, buffer_len, |
| - overflow_supported_ ? &packets_dropped_ : nullptr, client_ip, |
| - server_address); |
| +QuicConnectionHelper* QuicSimpleClient::CreateQuicConnectionHelper() { |
| + return new QuicConnectionHelper( |
| + base::MessageLoop::current()->message_loop_proxy().get(), |
| + &clock_, |
| + QuicRandom::GetInstance()); |
| } |
| -bool QuicClient::ReadAndProcessPacket() { |
| - // Allocate some extra space so we can send an error if the server goes over |
| - // the limit. |
| - char buf[2 * kMaxPacketSize]; |
| - |
| - IPEndPoint server_address; |
| - IPAddressNumber client_ip; |
| - |
| - int bytes_read = ReadPacket(buf, arraysize(buf), &server_address, &client_ip); |
| - |
| - if (bytes_read < 0) { |
| - return false; |
| - } |
| - |
| - QuicEncryptedPacket packet(buf, bytes_read, false); |
| - |
| - IPEndPoint client_address(client_ip, client_address_.port()); |
| - session_->connection()->ProcessUdpPacket( |
| - client_address, server_address, packet); |
| - return true; |
| +QuicPacketWriter* QuicSimpleClient::CreateQuicPacketWriter() { |
| + return new QuicDefaultPacketWriter(socket_.get()); |
| } |
| } // namespace tools |