Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(52)

Side by Side Diff: net/quic/quic_server.cc

Issue 340433002: Port QuicServer to Chrome network stack (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Indentation fix Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/tools/quic/quic_server.h" 5 #include "net/quic/quic_server.h"
6 6
7 #include <errno.h>
8 #include <features.h>
9 #include <netinet/in.h>
10 #include <string.h> 7 #include <string.h>
11 #include <sys/epoll.h>
12 #include <sys/socket.h>
13 8
14 #include "net/base/ip_endpoint.h" 9 #include "net/base/ip_endpoint.h"
10 #include "net/base/net_errors.h"
15 #include "net/quic/congestion_control/tcp_receiver.h" 11 #include "net/quic/congestion_control/tcp_receiver.h"
16 #include "net/quic/crypto/crypto_handshake.h" 12 #include "net/quic/crypto/crypto_handshake.h"
17 #include "net/quic/crypto/quic_random.h" 13 #include "net/quic/crypto/quic_random.h"
18 #include "net/quic/quic_clock.h"
19 #include "net/quic/quic_crypto_stream.h" 14 #include "net/quic/quic_crypto_stream.h"
20 #include "net/quic/quic_data_reader.h" 15 #include "net/quic/quic_data_reader.h"
16 #include "net/quic/quic_dispatcher.h"
17 #include "net/quic/quic_in_memory_cache.h"
21 #include "net/quic/quic_protocol.h" 18 #include "net/quic/quic_protocol.h"
22 #include "net/tools/quic/quic_in_memory_cache.h" 19 #include "net/quic/quic_server_packet_writer.h"
23 #include "net/tools/quic/quic_socket_utils.h" 20 #include "net/udp/udp_server_socket.h"
24
25 #define MMSG_MORE 0
26
27 #ifndef SO_RXQ_OVFL
28 #define SO_RXQ_OVFL 40
29 #endif
30
31 const int kEpollFlags = EPOLLIN | EPOLLOUT | EPOLLET;
32 static const char kSourceAddressTokenSecret[] = "secret";
33 const uint32 kServerInitialFlowControlWindow = 100 * net::kMaxPacketSize;
34 21
35 namespace net { 22 namespace net {
36 namespace tools {
37 23
38 QuicServer::QuicServer() 24 namespace {
39 : port_(0), 25
40 fd_(-1), 26 const char kSourceAddressTokenSecret[] = "secret";
41 packets_dropped_(0), 27
42 overflow_supported_(false), 28 // Allocate some extra space so we can send an error if the client goes over
43 use_recvmmsg_(false), 29 // the limit.
44 crypto_config_(kSourceAddressTokenSecret, QuicRandom::GetInstance()), 30 const int kReadBufferSize = 2 * kMaxPacketSize;
45 supported_versions_(QuicSupportedVersions()) { 31
46 // Use hardcoded crypto parameters for now. 32 const uint32 kServerInitialFlowControlWindow = 100 * kMaxPacketSize;
47 config_.SetDefaults(); 33
48 Initialize(); 34 } // namespace
49 }
50 35
51 QuicServer::QuicServer(const QuicConfig& config, 36 QuicServer::QuicServer(const QuicConfig& config,
52 const QuicVersionVector& supported_versions) 37 const QuicVersionVector& supported_versions)
53 : port_(0), 38 : helper_(base::MessageLoop::current()->message_loop_proxy().get(),
54 fd_(-1), 39 &clock_,
55 packets_dropped_(0), 40 QuicRandom::GetInstance()),
56 overflow_supported_(false),
57 use_recvmmsg_(false),
58 config_(config), 41 config_(config),
59 crypto_config_(kSourceAddressTokenSecret, QuicRandom::GetInstance()), 42 crypto_config_(kSourceAddressTokenSecret, QuicRandom::GetInstance()),
60 supported_versions_(supported_versions) { 43 supported_versions_(supported_versions),
44 read_pending_(false),
45 synchronous_read_count_(0),
46 read_buffer_(new IOBufferWithSize(kReadBufferSize)),
47 weak_factory_(this) {
61 Initialize(); 48 Initialize();
62 } 49 }
63 50
64 void QuicServer::Initialize() { 51 void QuicServer::Initialize() {
65 #if MMSG_MORE
66 use_recvmmsg_ = true;
67 #endif
68 epoll_server_.set_timeout_in_us(50 * 1000);
69 // Initialize the in memory cache now. 52 // Initialize the in memory cache now.
70 QuicInMemoryCache::GetInstance(); 53 QuicInMemoryCache::GetInstance();
71 54
72 QuicEpollClock clock(&epoll_server_); 55 scoped_ptr<CryptoHandshakeMessage> scfg(
56 crypto_config_.AddDefaultConfig(helper_.GetRandomGenerator(),
57 helper_.GetClock(),
58 QuicCryptoServerConfig::ConfigOptions()));
73 59
74 scoped_ptr<CryptoHandshakeMessage> scfg(
75 crypto_config_.AddDefaultConfig(
76 QuicRandom::GetInstance(), &clock,
77 QuicCryptoServerConfig::ConfigOptions()));
78
79 // Set flow control options in the config.
80 config_.SetInitialCongestionWindowToSend(kServerInitialFlowControlWindow); 60 config_.SetInitialCongestionWindowToSend(kServerInitialFlowControlWindow);
81 } 61 }
82 62
83 QuicServer::~QuicServer() { 63 QuicServer::~QuicServer() {
84 } 64 }
85 65
86 bool QuicServer::Listen(const IPEndPoint& address) { 66 int QuicServer::Listen(const IPEndPoint& address) {
87 port_ = address.port(); 67 scoped_ptr<UDPServerSocket> socket(
88 int address_family = address.GetSockAddrFamily(); 68 new UDPServerSocket(&net_log_, NetLog::Source()));
89 fd_ = socket(address_family, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP);
90 if (fd_ < 0) {
91 LOG(ERROR) << "CreateSocket() failed: " << strerror(errno);
92 return false;
93 }
94 69
95 int rc = QuicSocketUtils::SetGetAddressInfo(fd_, address_family); 70 socket->AllowAddressReuse();
96 71
72 int rc = socket->Listen(address);
97 if (rc < 0) { 73 if (rc < 0) {
98 LOG(ERROR) << "IP detection not supported" << strerror(errno); 74 LOG(ERROR) << "Listen() failed: " << ErrorToString(rc);
99 return false; 75 return rc;
100 }
101
102 int get_overflow = 1;
103 rc = setsockopt(
104 fd_, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, sizeof(get_overflow));
105
106 if (rc < 0) {
107 DLOG(WARNING) << "Socket overflow detection not supported";
108 } else {
109 overflow_supported_ = true;
110 } 76 }
111 77
112 // These send and receive buffer sizes are sized for a single connection, 78 // These send and receive buffer sizes are sized for a single connection,
113 // because the default usage of QuicServer is as a test server with one or 79 // because the default usage of QuicServer is as a test server with one or
114 // two clients. Adjust higher for use with many clients. 80 // two clients. Adjust higher for use with many clients.
115 if (!QuicSocketUtils::SetReceiveBufferSize(fd_, 81 rc = socket->SetReceiveBufferSize(TcpReceiver::kReceiveWindowTCP);
116 TcpReceiver::kReceiveWindowTCP)) { 82 if (rc < 0) {
117 return false; 83 LOG(ERROR) << "SetReceiveBufferSize() failed: " << ErrorToString(rc);
84 return rc;
118 } 85 }
119 86
120 if (!QuicSocketUtils::SetSendBufferSize(fd_, 87 rc = socket->SetSendBufferSize(20 * kMaxPacketSize);
121 TcpReceiver::kReceiveWindowTCP)) { 88 if (rc < 0) {
122 return false; 89 LOG(ERROR) << "SetSendBufferSize() failed: " << ErrorToString(rc);
90 return rc;
123 } 91 }
124 92
125 // Enable the socket option that allows the local address to be 93 rc = socket->GetLocalAddress(&server_address_);
126 // returned if the socket is bound to more than on address. 94 if (rc < 0) {
127 int get_local_ip = 1; 95 LOG(ERROR) << "GetLocalAddress() failed: " << ErrorToString(rc);
128 rc = setsockopt(fd_, IPPROTO_IP, IP_PKTINFO, 96 return rc;
129 &get_local_ip, sizeof(get_local_ip));
130 if (rc == 0 && address_family == AF_INET6) {
131 rc = setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVPKTINFO,
132 &get_local_ip, sizeof(get_local_ip));
133 }
134 if (rc != 0) {
135 LOG(ERROR) << "Failed to set required socket options";
136 return false;
137 } 97 }
138 98
139 sockaddr_storage raw_addr; 99 DVLOG(1) << "Listening on " << server_address_.ToString();
140 socklen_t raw_addr_len = sizeof(raw_addr);
141 CHECK(address.ToSockAddr(reinterpret_cast<sockaddr*>(&raw_addr),
142 &raw_addr_len));
143 rc = bind(fd_,
144 reinterpret_cast<const sockaddr*>(&raw_addr),
145 sizeof(raw_addr));
146 if (rc < 0) {
147 LOG(ERROR) << "Bind failed: " << strerror(errno);
148 return false;
149 }
150 100
151 DVLOG(1) << "Listening on " << address.ToString(); 101 socket_.swap(socket);
152 if (port_ == 0) {
153 SockaddrStorage storage;
154 IPEndPoint server_address;
155 if (getsockname(fd_, storage.addr, &storage.addr_len) != 0 ||
156 !server_address.FromSockAddr(storage.addr, storage.addr_len)) {
157 LOG(ERROR) << "Unable to get self address. Error: " << strerror(errno);
158 return false;
159 }
160 port_ = server_address.port();
161 DVLOG(1) << "Kernel assigned port is " << port_;
162 }
163 102
164 epoll_server_.RegisterFD(fd_, this, kEpollFlags); 103 dispatcher_.reset(
165 dispatcher_.reset(CreateQuicDispatcher()); 104 new QuicDispatcher(config_,
166 dispatcher_->Initialize(fd_); 105 crypto_config_,
106 supported_versions_,
107 &helper_));
108 QuicServerPacketWriter* writer = new QuicServerPacketWriter(
109 socket_.get(),
110 dispatcher_.get());
111 dispatcher_->Initialize(writer);
167 112
168 return true; 113 StartReading();
169 }
170 114
171 QuicDispatcher* QuicServer::CreateQuicDispatcher() { 115 return OK;
172 return new QuicDispatcher(
173 config_,
174 crypto_config_,
175 supported_versions_,
176 &epoll_server_);
177 }
178
179 void QuicServer::WaitForEvents() {
180 epoll_server_.WaitForEventsAndExecuteCallbacks();
181 } 116 }
182 117
183 void QuicServer::Shutdown() { 118 void QuicServer::Shutdown() {
184 // Before we shut down the epoll server, give all active sessions a chance to 119 // Before we shut down the epoll server, give all active sessions a chance to
185 // notify clients that they're closing. 120 // notify clients that they're closing.
186 dispatcher_->Shutdown(); 121 dispatcher_->Shutdown();
187 122
188 close(fd_); 123 socket_->Close();
189 fd_ = -1; 124 socket_.reset();
190 } 125 }
191 126
192 void QuicServer::OnEvent(int fd, EpollEvent* event) { 127 void QuicServer::StartReading() {
193 DCHECK_EQ(fd, fd_); 128 if (read_pending_) {
194 event->out_ready_mask = 0; 129 return;
130 }
131 read_pending_ = true;
195 132
196 if (event->in_events & EPOLLIN) { 133 int result = socket_->RecvFrom(
197 DVLOG(1) << "EPOLLIN"; 134 read_buffer_.get(),
198 bool read = true; 135 read_buffer_->size(),
199 while (read) { 136 &client_address_,
200 read = ReadAndDispatchSinglePacket( 137 base::Bind(&QuicServer::OnReadComplete, base::Unretained(this)));
201 fd_, port_, dispatcher_.get(), 138
202 overflow_supported_ ? &packets_dropped_ : NULL); 139 if (result == ERR_IO_PENDING) {
203 } 140 synchronous_read_count_ = 0;
141 return;
204 } 142 }
205 if (event->in_events & EPOLLOUT) { 143
206 dispatcher_->OnCanWrite(); 144 if (++synchronous_read_count_ > 32) {
207 if (dispatcher_->HasPendingWrites()) { 145 synchronous_read_count_ = 0;
208 event->out_ready_mask |= EPOLLOUT; 146 // Schedule the processing through the message loop to 1) prevent infinite
209 } 147 // recursion and 2) avoid blocking the thread for too long.
210 } 148 base::MessageLoop::current()->PostTask(
211 if (event->in_events & EPOLLERR) { 149 FROM_HERE,
150 base::Bind(&QuicServer::OnReadComplete,
151 weak_factory_.GetWeakPtr(),
152 result));
153 } else {
154 OnReadComplete(result);
212 } 155 }
213 } 156 }
214 157
215 /* static */ 158 void QuicServer::OnReadComplete(int result) {
216 bool QuicServer::ReadAndDispatchSinglePacket(int fd, 159 read_pending_ = false;
217 int port, 160 if (result == 0)
218 QuicDispatcher* dispatcher, 161 result = ERR_CONNECTION_CLOSED;
219 uint32* packets_dropped) {
220 // Allocate some extra space so we can send an error if the client goes over
221 // the limit.
222 char buf[2 * kMaxPacketSize];
223 162
224 IPEndPoint client_address; 163 if (result < 0) {
225 IPAddressNumber server_ip; 164 LOG(ERROR) << "QuicServer read failed: " << ErrorToString(result);
226 int bytes_read = 165 Shutdown();
227 QuicSocketUtils::ReadPacket(fd, buf, arraysize(buf), 166 return;
228 packets_dropped,
229 &server_ip, &client_address);
230
231 if (bytes_read < 0) {
232 return false; // We failed to read.
233 } 167 }
234 168
235 QuicEncryptedPacket packet(buf, bytes_read, false); 169 QuicEncryptedPacket packet(read_buffer_->data(), result, false);
170 dispatcher_->ProcessPacket(server_address_, client_address_, packet);
236 171
237 IPEndPoint server_address(server_ip, port); 172 StartReading();
238 dispatcher->ProcessPacket(server_address, client_address, packet);
239
240 return true;
241 } 173 }
242 174
243 } // namespace tools
244 } // namespace net 175 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698