Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Unified Diff: net/quic/quic_server.cc

Issue 340433002: Port QuicServer to Chrome network stack (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fix invalid memory access + blocked writers Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/quic/quic_server.cc
diff --git a/net/tools/quic/quic_server.cc b/net/quic/quic_server.cc
similarity index 26%
copy from net/tools/quic/quic_server.cc
copy to net/quic/quic_server.cc
index 2d1a52d3f0dc010f5c60e19eb1147dbedd6a0e71..5e6d95f168850f4266847a218607990910459aed 100644
--- a/net/tools/quic/quic_server.cc
+++ b/net/quic/quic_server.cc
@@ -2,181 +2,113 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "net/tools/quic/quic_server.h"
+#include "net/quic/quic_server.h"
-#include <errno.h>
-#include <features.h>
-#include <netinet/in.h>
#include <string.h>
-#include <sys/epoll.h>
-#include <sys/socket.h>
#include "net/base/ip_endpoint.h"
+#include "net/base/net_errors.h"
#include "net/quic/congestion_control/tcp_receiver.h"
#include "net/quic/crypto/crypto_handshake.h"
#include "net/quic/crypto/quic_random.h"
-#include "net/quic/quic_clock.h"
#include "net/quic/quic_crypto_stream.h"
#include "net/quic/quic_data_reader.h"
+#include "net/quic/quic_dispatcher.h"
+#include "net/quic/quic_in_memory_cache.h"
#include "net/quic/quic_protocol.h"
-#include "net/tools/quic/quic_in_memory_cache.h"
+#include "net/quic/quic_server_packet_writer.h"
#include "net/tools/quic/quic_socket_utils.h"
-#define MMSG_MORE 0
-#ifndef SO_RXQ_OVFL
-#define SO_RXQ_OVFL 40
-#endif
+namespace net {
-const int kEpollFlags = EPOLLIN | EPOLLOUT | EPOLLET;
static const char kSourceAddressTokenSecret[] = "secret";
-const uint32 kServerInitialFlowControlWindow = 100 * net::kMaxPacketSize;
-namespace net {
-namespace tools {
-
-QuicServer::QuicServer()
- : port_(0),
- fd_(-1),
- packets_dropped_(0),
- overflow_supported_(false),
- use_recvmmsg_(false),
- crypto_config_(kSourceAddressTokenSecret, QuicRandom::GetInstance()),
- supported_versions_(QuicSupportedVersions()),
- server_initial_flow_control_receive_window_(
- kServerInitialFlowControlWindow) {
- // Use hardcoded crypto parameters for now.
- config_.SetDefaults();
- Initialize();
-}
+// Allocate some extra space so we can send an error if the client goes over
+// the limit.
+static const int kReadBufferSize = 2 * net::kMaxPacketSize;
QuicServer::QuicServer(const QuicConfig& config,
const QuicVersionVector& supported_versions,
uint32 server_initial_flow_control_receive_window)
- : port_(0),
- fd_(-1),
- packets_dropped_(0),
- overflow_supported_(false),
- use_recvmmsg_(false),
+ : helper_(new QuicConnectionHelper(
+ base::MessageLoop::current()->message_loop_proxy().get(),
+ &clock_,
+ QuicRandom::GetInstance())),
config_(config),
crypto_config_(kSourceAddressTokenSecret, QuicRandom::GetInstance()),
supported_versions_(supported_versions),
server_initial_flow_control_receive_window_(
- server_initial_flow_control_receive_window) {
+ server_initial_flow_control_receive_window),
+ read_pending_(false),
+ synchronous_read_count_(0),
+ read_buffer_(new IOBufferWithSize(kReadBufferSize)),
+ weak_factory_(this) {
Initialize();
}
void QuicServer::Initialize() {
-#if MMSG_MORE
- use_recvmmsg_ = true;
-#endif
- epoll_server_.set_timeout_in_us(50 * 1000);
// Initialize the in memory cache now.
QuicInMemoryCache::GetInstance();
- QuicEpollClock clock(&epoll_server_);
-
scoped_ptr<CryptoHandshakeMessage> scfg(
- crypto_config_.AddDefaultConfig(
- QuicRandom::GetInstance(), &clock,
- QuicCryptoServerConfig::ConfigOptions()));
+ crypto_config_.AddDefaultConfig(QuicRandom::GetInstance(),
+ helper_->GetClock(),
+ QuicCryptoServerConfig::ConfigOptions()));
}
QuicServer::~QuicServer() {
}
+// TODO(dmz): return an error code
bool QuicServer::Listen(const IPEndPoint& address) {
- port_ = address.port();
- int address_family = address.GetSockAddrFamily();
- fd_ = socket(address_family, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP);
- if (fd_ < 0) {
- LOG(ERROR) << "CreateSocket() failed: " << strerror(errno);
- return false;
- }
+ scoped_ptr<UDPServerSocket> socket(
+ new UDPServerSocket(NULL, NetLog::Source()));
- int rc = QuicSocketUtils::SetGetAddressInfo(fd_, address_family);
+ // TODO(dmz): socket->AllowBroadcast()? socket->SetMulticastInterface?
+ socket->AllowAddressReuse();
+ int rc = socket->Listen(address);
if (rc < 0) {
- LOG(ERROR) << "IP detection not supported" << strerror(errno);
+ LOG(ERROR) << "Listen() failed: " << strerror(errno);
return false;
}
- int get_overflow = 1;
- rc = setsockopt(
- fd_, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, sizeof(get_overflow));
-
- if (rc < 0) {
- DLOG(WARNING) << "Socket overflow detection not supported";
- } else {
- overflow_supported_ = true;
- }
+ // TODO(dmz): UDPServerSocket handles this?
+ // rc = QuicSocketUtils::SetGetAddressInfo(fd_, address_family);
// These send and receive buffer sizes are sized for a single connection,
// because the default usage of QuicServer is as a test server with one or
// two clients. Adjust higher for use with many clients.
- if (!QuicSocketUtils::SetReceiveBufferSize(fd_,
- TcpReceiver::kReceiveWindowTCP)) {
+ rc = socket->SetReceiveBufferSize(TcpReceiver::kReceiveWindowTCP);
+ if (rc < 0) {
return false;
}
- if (!QuicSocketUtils::SetSendBufferSize(fd_,
- TcpReceiver::kReceiveWindowTCP)) {
+ rc = socket->SetSendBufferSize(TcpReceiver::kReceiveWindowTCP);
+ if (rc < 0) {
return false;
}
- // Enable the socket option that allows the local address to be
- // returned if the socket is bound to more than on address.
- int get_local_ip = 1;
- rc = setsockopt(fd_, IPPROTO_IP, IP_PKTINFO,
- &get_local_ip, sizeof(get_local_ip));
- if (rc == 0 && address_family == AF_INET6) {
- rc = setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVPKTINFO,
- &get_local_ip, sizeof(get_local_ip));
- }
- if (rc != 0) {
- LOG(ERROR) << "Failed to set required socket options";
- return false;
- }
- sockaddr_storage raw_addr;
- socklen_t raw_addr_len = sizeof(raw_addr);
- CHECK(address.ToSockAddr(reinterpret_cast<sockaddr*>(&raw_addr),
- &raw_addr_len));
- rc = bind(fd_,
- reinterpret_cast<const sockaddr*>(&raw_addr),
- sizeof(raw_addr));
- if (rc < 0) {
- LOG(ERROR) << "Bind failed: " << strerror(errno);
- return false;
- }
+ socket->GetLocalAddress(&server_address_);
- DVLOG(1) << "Listening on " << address.ToString();
- if (port_ == 0) {
- SockaddrStorage storage;
- IPEndPoint server_address;
- if (getsockname(fd_, storage.addr, &storage.addr_len) != 0 ||
- !server_address.FromSockAddr(storage.addr, storage.addr_len)) {
- LOG(ERROR) << "Unable to get self address. Error: " << strerror(errno);
- return false;
- }
- port_ = server_address.port();
- DVLOG(1) << "Kernel assigned port is " << port_;
- }
+ DVLOG(1) << "Listening on " << server_address_.ToString();
- epoll_server_.RegisterFD(fd_, this, kEpollFlags);
- dispatcher_.reset(new QuicDispatcher(
- config_,
- crypto_config_,
- supported_versions_,
- &epoll_server_,
- server_initial_flow_control_receive_window_));
- dispatcher_->Initialize(fd_);
+ socket_.swap(socket);
- return true;
-}
+ dispatcher_.reset(
+ new QuicDispatcher(config_,
+ crypto_config_,
+ supported_versions_,
+ helper_,
+ server_initial_flow_control_receive_window_));
+ dispatcher_->Initialize(new QuicServerPacketWriter(dispatcher_.get(),
+ socket_.get()));
+
+ StartReading();
-void QuicServer::WaitForEvents() {
- epoll_server_.WaitForEventsAndExecuteCallbacks();
+ return true;
}
void QuicServer::Shutdown() {
@@ -184,60 +116,53 @@ void QuicServer::Shutdown() {
// notify clients that they're closing.
dispatcher_->Shutdown();
- close(fd_);
- fd_ = -1;
+ socket_->Close();
}
-void QuicServer::OnEvent(int fd, EpollEvent* event) {
- DCHECK_EQ(fd, fd_);
- event->out_ready_mask = 0;
-
- if (event->in_events & EPOLLIN) {
- DVLOG(1) << "EPOLLIN";
- bool read = true;
- while (read) {
- read = ReadAndDispatchSinglePacket(
- fd_, port_, dispatcher_.get(),
- overflow_supported_ ? &packets_dropped_ : NULL);
- }
- }
- if (event->in_events & EPOLLOUT) {
- dispatcher_->OnCanWrite();
- if (dispatcher_->HasPendingWrites()) {
- event->out_ready_mask |= EPOLLOUT;
- }
- }
- if (event->in_events & EPOLLERR) {
+void QuicServer::OnReadComplete(int result) {
+ read_pending_ = false;
+ if (result == 0)
+ result = ERR_CONNECTION_CLOSED; // TODO(dmz): correct for server?
+
+ if (result < 0) {
+ // TODO(dmz) how to handle error? probably just DVLOG, ignore, & continue?
+ return;
}
+
+ QuicEncryptedPacket packet(read_buffer_->data(), result, false);
+ dispatcher_->ProcessPacket(server_address_, read_addr_, packet);
+
+ StartReading();
}
-/* static */
-bool QuicServer::ReadAndDispatchSinglePacket(int fd,
- int port,
- QuicDispatcher* dispatcher,
- uint32* packets_dropped) {
- // Allocate some extra space so we can send an error if the client goes over
- // the limit.
- char buf[2 * kMaxPacketSize];
-
- IPEndPoint client_address;
- IPAddressNumber server_ip;
- int bytes_read =
- QuicSocketUtils::ReadPacket(fd, buf, arraysize(buf),
- packets_dropped,
- &server_ip, &client_address);
-
- if (bytes_read < 0) {
- return false; // We failed to read.
+void QuicServer::StartReading() {
+ if (read_pending_) {
+ return;
}
- QuicEncryptedPacket packet(buf, bytes_read, false);
+ int result = socket_->RecvFrom(
+ read_buffer_.get(),
+ read_buffer_->size(),
+ &read_addr_,
+ base::Bind(&QuicServer::OnReadComplete, base::Unretained(this)));
- IPEndPoint server_address(server_ip, port);
- dispatcher->ProcessPacket(server_address, client_address, packet);
+ if (result == ERR_IO_PENDING) {
+ read_pending_ = true;
+ return;
+ }
- return true;
+ if (++synchronous_read_count_ > 32) {
+ synchronous_read_count_ = 0;
+ // Schedule the processing through the message loop to 1) prevent infinite
+ // recursion and 2) avoid blocking the thread for too long.
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&QuicServer::OnReadComplete,
+ weak_factory_.GetWeakPtr(),
+ result));
+ } else {
+ OnReadComplete(result);
+ }
}
-} // namespace tools
} // namespace net

Powered by Google App Engine
This is Rietveld 408576698