Chromium Code Reviews| Index: net/quic/quic_server.cc |
| diff --git a/net/tools/quic/quic_server.cc b/net/quic/quic_server.cc |
| similarity index 24% |
| copy from net/tools/quic/quic_server.cc |
| copy to net/quic/quic_server.cc |
| index 2d1a52d3f0dc010f5c60e19eb1147dbedd6a0e71..3cfc12099d79a225ce2a4e56f65a11b27484d517 100644 |
| --- a/net/tools/quic/quic_server.cc |
| +++ b/net/quic/quic_server.cc |
| @@ -2,181 +2,112 @@ |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| -#include "net/tools/quic/quic_server.h" |
| +#include "net/quic/quic_server.h" |
| -#include <errno.h> |
| -#include <features.h> |
| -#include <netinet/in.h> |
| #include <string.h> |
| -#include <sys/epoll.h> |
| -#include <sys/socket.h> |
| #include "net/base/ip_endpoint.h" |
| +#include "net/base/net_errors.h" |
| #include "net/quic/congestion_control/tcp_receiver.h" |
| #include "net/quic/crypto/crypto_handshake.h" |
| #include "net/quic/crypto/quic_random.h" |
| -#include "net/quic/quic_clock.h" |
| #include "net/quic/quic_crypto_stream.h" |
| #include "net/quic/quic_data_reader.h" |
| +#include "net/quic/quic_dispatcher.h" |
| +#include "net/quic/quic_in_memory_cache.h" |
| #include "net/quic/quic_protocol.h" |
| -#include "net/tools/quic/quic_in_memory_cache.h" |
| +#include "net/quic/quic_server_packet_writer.h" |
| #include "net/tools/quic/quic_socket_utils.h" |
| -#define MMSG_MORE 0 |
| -#ifndef SO_RXQ_OVFL |
| -#define SO_RXQ_OVFL 40 |
| -#endif |
| +namespace net { |
| -const int kEpollFlags = EPOLLIN | EPOLLOUT | EPOLLET; |
| static const char kSourceAddressTokenSecret[] = "secret"; |
| -const uint32 kServerInitialFlowControlWindow = 100 * net::kMaxPacketSize; |
| -namespace net { |
| -namespace tools { |
| - |
| -QuicServer::QuicServer() |
| - : port_(0), |
| - fd_(-1), |
| - packets_dropped_(0), |
| - overflow_supported_(false), |
| - use_recvmmsg_(false), |
| - crypto_config_(kSourceAddressTokenSecret, QuicRandom::GetInstance()), |
| - supported_versions_(QuicSupportedVersions()), |
| - server_initial_flow_control_receive_window_( |
| - kServerInitialFlowControlWindow) { |
| - // Use hardcoded crypto parameters for now. |
| - config_.SetDefaults(); |
| - Initialize(); |
| -} |
| +// Allocate some extra space so we can send an error if the client goes over |
| +// the limit. |
| +static const int kReadBufferSize = 2 * net::kMaxPacketSize; |
|
Ryan Hamilton
2014/06/18 23:59:51
This seems too small. Why 2 (vs, say 100 from the
dmz
2014/06/19 17:46:24
This is actually a different constant (the size of
|
| QuicServer::QuicServer(const QuicConfig& config, |
| const QuicVersionVector& supported_versions, |
| uint32 server_initial_flow_control_receive_window) |
| - : port_(0), |
| - fd_(-1), |
| - packets_dropped_(0), |
| - overflow_supported_(false), |
| - use_recvmmsg_(false), |
| + : helper_(new QuicConnectionHelper( |
| + base::MessageLoop::current()->message_loop_proxy().get(), |
| + &clock_, |
| + QuicRandom::GetInstance())), |
| config_(config), |
| crypto_config_(kSourceAddressTokenSecret, QuicRandom::GetInstance()), |
| supported_versions_(supported_versions), |
| server_initial_flow_control_receive_window_( |
| - server_initial_flow_control_receive_window) { |
| + server_initial_flow_control_receive_window), |
| + read_pending_(false), |
| + synchronous_read_count_(0), |
| + read_buffer_(new IOBufferWithSize(kReadBufferSize)), |
| + weak_factory_(this) { |
| Initialize(); |
| } |
| void QuicServer::Initialize() { |
| -#if MMSG_MORE |
| - use_recvmmsg_ = true; |
| -#endif |
| - epoll_server_.set_timeout_in_us(50 * 1000); |
| // Initialize the in memory cache now. |
| QuicInMemoryCache::GetInstance(); |
| - QuicEpollClock clock(&epoll_server_); |
| - |
| scoped_ptr<CryptoHandshakeMessage> scfg( |
| - crypto_config_.AddDefaultConfig( |
| - QuicRandom::GetInstance(), &clock, |
| - QuicCryptoServerConfig::ConfigOptions())); |
| + crypto_config_.AddDefaultConfig(QuicRandom::GetInstance(), |
| + helper_->GetClock(), |
| + QuicCryptoServerConfig::ConfigOptions())); |
| } |
| QuicServer::~QuicServer() { |
| } |
| -bool QuicServer::Listen(const IPEndPoint& address) { |
| - port_ = address.port(); |
| - int address_family = address.GetSockAddrFamily(); |
| - fd_ = socket(address_family, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP); |
| - if (fd_ < 0) { |
| - LOG(ERROR) << "CreateSocket() failed: " << strerror(errno); |
| - return false; |
| - } |
| +int QuicServer::Listen(const IPEndPoint& address) { |
| + scoped_ptr<UDPServerSocket> socket( |
| + new UDPServerSocket(NULL, NetLog::Source())); |
| - int rc = QuicSocketUtils::SetGetAddressInfo(fd_, address_family); |
| + // TODO(dmz): socket->AllowBroadcast()? socket->SetMulticastInterface? |
| + socket->AllowAddressReuse(); |
| + int rc = socket->Listen(address); |
| if (rc < 0) { |
| - LOG(ERROR) << "IP detection not supported" << strerror(errno); |
| - return false; |
| - } |
| - |
| - int get_overflow = 1; |
| - rc = setsockopt( |
| - fd_, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, sizeof(get_overflow)); |
| - |
| - if (rc < 0) { |
| - DLOG(WARNING) << "Socket overflow detection not supported"; |
| - } else { |
| - overflow_supported_ = true; |
| + LOG(ERROR) << "Listen() failed: " << strerror(errno); |
| + return rc; |
| } |
| // These send and receive buffer sizes are sized for a single connection, |
| // because the default usage of QuicServer is as a test server with one or |
| // two clients. Adjust higher for use with many clients. |
| - if (!QuicSocketUtils::SetReceiveBufferSize(fd_, |
| - TcpReceiver::kReceiveWindowTCP)) { |
| - return false; |
| + rc = socket->SetReceiveBufferSize(TcpReceiver::kReceiveWindowTCP); |
| + if (rc < 0) { |
| + LOG(ERROR) << "SetReceiveBufferSize() failed: " << strerror(errno); |
| + return rc; |
| } |
| - if (!QuicSocketUtils::SetSendBufferSize(fd_, |
| - TcpReceiver::kReceiveWindowTCP)) { |
| - return false; |
| + rc = socket->SetSendBufferSize(TcpReceiver::kReceiveWindowTCP); |
|
Ryan Hamilton
2014/06/18 23:59:51
Seems surprising to set the send buffer to the *re
dmz
2014/06/19 17:46:24
I agree -- although this is unchanged from the ori
Ryan Hamilton
2014/06/19 19:21:29
Let's do what we do in the QuicStreamFactory:
net
dmz
2014/06/19 20:06:28
Done.
|
| + if (rc < 0) { |
| + LOG(ERROR) << "SetSendBufferSize() failed: " << strerror(errno); |
| + return rc; |
| } |
| - // Enable the socket option that allows the local address to be |
| - // returned if the socket is bound to more than on address. |
| - int get_local_ip = 1; |
| - rc = setsockopt(fd_, IPPROTO_IP, IP_PKTINFO, |
| - &get_local_ip, sizeof(get_local_ip)); |
| - if (rc == 0 && address_family == AF_INET6) { |
| - rc = setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVPKTINFO, |
| - &get_local_ip, sizeof(get_local_ip)); |
| - } |
| - if (rc != 0) { |
| - LOG(ERROR) << "Failed to set required socket options"; |
| - return false; |
| - } |
| - sockaddr_storage raw_addr; |
| - socklen_t raw_addr_len = sizeof(raw_addr); |
| - CHECK(address.ToSockAddr(reinterpret_cast<sockaddr*>(&raw_addr), |
| - &raw_addr_len)); |
| - rc = bind(fd_, |
| - reinterpret_cast<const sockaddr*>(&raw_addr), |
| - sizeof(raw_addr)); |
| - if (rc < 0) { |
| - LOG(ERROR) << "Bind failed: " << strerror(errno); |
| - return false; |
| - } |
| + socket->GetLocalAddress(&server_address_); |
| - DVLOG(1) << "Listening on " << address.ToString(); |
| - if (port_ == 0) { |
| - SockaddrStorage storage; |
| - IPEndPoint server_address; |
| - if (getsockname(fd_, storage.addr, &storage.addr_len) != 0 || |
| - !server_address.FromSockAddr(storage.addr, storage.addr_len)) { |
| - LOG(ERROR) << "Unable to get self address. Error: " << strerror(errno); |
| - return false; |
| - } |
| - port_ = server_address.port(); |
| - DVLOG(1) << "Kernel assigned port is " << port_; |
| - } |
| + DVLOG(1) << "Listening on " << server_address_.ToString(); |
| - epoll_server_.RegisterFD(fd_, this, kEpollFlags); |
| - dispatcher_.reset(new QuicDispatcher( |
| - config_, |
| - crypto_config_, |
| - supported_versions_, |
| - &epoll_server_, |
| - server_initial_flow_control_receive_window_)); |
| - dispatcher_->Initialize(fd_); |
| + socket_.swap(socket); |
| - return true; |
| -} |
| + dispatcher_.reset( |
| + new QuicDispatcher(config_, |
| + crypto_config_, |
| + supported_versions_, |
| + helper_, |
| + server_initial_flow_control_receive_window_)); |
| + QuicServerPacketWriter* writer = new QuicServerPacketWriter(socket_.get()); |
| + writer->SetWriterToUnblock(dispatcher_.get()); |
| + dispatcher_->Initialize(writer); |
| -void QuicServer::WaitForEvents() { |
| - epoll_server_.WaitForEventsAndExecuteCallbacks(); |
| + StartReading(); |
| + |
| + return OK; |
| } |
| void QuicServer::Shutdown() { |
| @@ -184,60 +115,53 @@ void QuicServer::Shutdown() { |
| // notify clients that they're closing. |
| dispatcher_->Shutdown(); |
| - close(fd_); |
| - fd_ = -1; |
| + socket_->Close(); |
| } |
| -void QuicServer::OnEvent(int fd, EpollEvent* event) { |
| - DCHECK_EQ(fd, fd_); |
| - event->out_ready_mask = 0; |
| - |
| - if (event->in_events & EPOLLIN) { |
| - DVLOG(1) << "EPOLLIN"; |
| - bool read = true; |
| - while (read) { |
| - read = ReadAndDispatchSinglePacket( |
| - fd_, port_, dispatcher_.get(), |
| - overflow_supported_ ? &packets_dropped_ : NULL); |
| - } |
| - } |
| - if (event->in_events & EPOLLOUT) { |
| - dispatcher_->OnCanWrite(); |
| - if (dispatcher_->HasPendingWrites()) { |
| - event->out_ready_mask |= EPOLLOUT; |
| - } |
| - } |
| - if (event->in_events & EPOLLERR) { |
| +void QuicServer::OnReadComplete(int result) { |
|
Ryan Hamilton
2014/06/18 23:59:51
Please move this method below StartReading();
dmz
2014/06/19 17:46:24
Done.
Ryan Hamilton
2014/06/19 19:21:29
Still looks to be above StartReading()?
dmz
2014/06/19 20:06:28
Not in the newest patch set -- looks like you stil
|
| + read_pending_ = false; |
| + if (result == 0) |
| + result = ERR_CONNECTION_CLOSED; // TODO(dmz): correct for server? |
| + |
| + if (result < 0) { |
| + // TODO(dmz) how to handle error? probably just DVLOG, ignore, & continue? |
| + return; |
|
Ryan Hamilton
2014/06/19 19:21:29
I'd recommend closing connections and exiting.
dmz
2014/06/19 20:06:28
Done.
|
| } |
| + |
| + QuicEncryptedPacket packet(read_buffer_->data(), result, false); |
| + dispatcher_->ProcessPacket(server_address_, read_addr_, packet); |
| + |
| + StartReading(); |
| } |
| -/* static */ |
| -bool QuicServer::ReadAndDispatchSinglePacket(int fd, |
| - int port, |
| - QuicDispatcher* dispatcher, |
| - uint32* packets_dropped) { |
| - // Allocate some extra space so we can send an error if the client goes over |
| - // the limit. |
| - char buf[2 * kMaxPacketSize]; |
| - |
| - IPEndPoint client_address; |
| - IPAddressNumber server_ip; |
| - int bytes_read = |
| - QuicSocketUtils::ReadPacket(fd, buf, arraysize(buf), |
| - packets_dropped, |
| - &server_ip, &client_address); |
| - |
| - if (bytes_read < 0) { |
| - return false; // We failed to read. |
| +void QuicServer::StartReading() { |
| + if (read_pending_) { |
| + return; |
| } |
| - QuicEncryptedPacket packet(buf, bytes_read, false); |
| + int result = socket_->RecvFrom( |
| + read_buffer_.get(), |
| + read_buffer_->size(), |
| + &read_addr_, |
| + base::Bind(&QuicServer::OnReadComplete, base::Unretained(this))); |
| - IPEndPoint server_address(server_ip, port); |
| - dispatcher->ProcessPacket(server_address, client_address, packet); |
| + if (result == ERR_IO_PENDING) { |
| + read_pending_ = true; |
| + return; |
| + } |
| - return true; |
| + if (++synchronous_read_count_ > 32) { |
| + synchronous_read_count_ = 0; |
| + // Schedule the processing through the message loop to 1) prevent infinite |
| + // recursion and 2) avoid blocking the thread for too long. |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&QuicServer::OnReadComplete, |
| + weak_factory_.GetWeakPtr(), |
| + result)); |
| + } else { |
| + OnReadComplete(result); |
| + } |
| } |
| -} // namespace tools |
| } // namespace net |