Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(85)

Side by Side Diff: remoting/protocol/quic_channel_factory.cc

Issue 1273233002: Implement QuicChannel and QuicChannelFactory (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/protocol/quic_channel_factory.h"
6
7 #include <vector>
8
9 #include "base/bind.h"
10 #include "base/location.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/stl_util.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "net/base/io_buffer.h"
15 #include "net/base/net_errors.h"
16 #include "net/quic/crypto/crypto_framer.h"
17 #include "net/quic/crypto/crypto_handshake_message.h"
18 #include "net/quic/crypto/crypto_protocol.h"
19 #include "net/quic/crypto/quic_random.h"
20 #include "net/quic/p2p/quic_p2p_crypto_config.h"
21 #include "net/quic/p2p/quic_p2p_session.h"
22 #include "net/quic/p2p/quic_p2p_stream.h"
23 #include "net/quic/quic_clock.h"
24 #include "net/quic/quic_connection_helper.h"
25 #include "net/quic/quic_default_packet_writer.h"
26 #include "net/socket/stream_socket.h"
27 #include "remoting/base/constants.h"
28 #include "remoting/protocol/datagram_channel_factory.h"
29 #include "remoting/protocol/p2p_datagram_socket.h"
30 #include "remoting/protocol/quic_channel.h"
31
32 namespace remoting {
33 namespace protocol {
34
35 namespace {
36
37 // The maximum receive window sizes for QUIC sessions and streams. These are
38 // the same values that are used in chrome.
39 const int kQuicSessionMaxRecvWindowSize = 15 * 1024 * 1024; // 15 MB
40 const int kQuicStreamMaxRecvWindowSize = 6 * 1024 * 1024; // 6 MB
41
42 class P2PQuicPacketWriter : public net::QuicPacketWriter {
43 public:
44 P2PQuicPacketWriter(net::QuicConnection* connection,
45 P2PDatagramSocket* socket)
46 : connection_(connection), socket_(socket), weak_factory_(this) {}
47 ~P2PQuicPacketWriter() override {}
48
49 // QuicPacketWriter interface.
50 net::WriteResult WritePacket(const char* buffer,
51 size_t buf_len,
52 const net::IPAddressNumber& self_address,
53 const net::IPEndPoint& peer_address) override {
54 DCHECK(!write_blocked_);
55
56 scoped_refptr<net::StringIOBuffer> buf(
57 new net::StringIOBuffer(std::string(buffer, buf_len)));
58 int result = socket_->Send(buf, buf_len,
59 base::Bind(&P2PQuicPacketWriter::OnSendComplete,
60 weak_factory_.GetWeakPtr()));
61 net::WriteStatus status = net::WRITE_STATUS_OK;
62 if (result < 0) {
63 if (result == net::ERR_IO_PENDING) {
64 status = net::WRITE_STATUS_BLOCKED;
65 write_blocked_ = true;
66 } else {
67 status = net::WRITE_STATUS_ERROR;
68 }
69 }
70
71 return net::WriteResult(status, result);
72 }
73 bool IsWriteBlockedDataBuffered() const override {
74 // P2PDatagramSocket::Send() method buffer the data until the Send is
75 // unblocked.
76 return true;
77 }
78 bool IsWriteBlocked() const override { return write_blocked_; }
79 void SetWritable() override { write_blocked_ = false; }
80
81 private:
82 void OnSendComplete(int result){
83 DCHECK_NE(result, net::ERR_IO_PENDING);
84 write_blocked_ = false;
85 if (result < 0) {
86 connection_->OnWriteError(result);
87 }
88 connection_->OnCanWrite();
89 }
90
91 net::QuicConnection* connection_;
92 P2PDatagramSocket* socket_;
93
94 // Whether a write is currently in flight.
95 bool write_blocked_ = false;
96
97 base::WeakPtrFactory<P2PQuicPacketWriter> weak_factory_;
98
99 DISALLOW_COPY_AND_ASSIGN(P2PQuicPacketWriter);
100 };
101
102 class QuicPacketWriterFactory
103 : public net::QuicConnection::PacketWriterFactory {
104 public:
105 explicit QuicPacketWriterFactory(P2PDatagramSocket* socket)
106 : socket_(socket) {}
107 ~QuicPacketWriterFactory() override {}
108
109 net::QuicPacketWriter* Create(
110 net::QuicConnection* connection) const override {
111 return new P2PQuicPacketWriter(connection, socket_);
112 }
113
114 private:
115 P2PDatagramSocket* socket_;
116 };
117
118 class P2PDatagramSocketAdapter : public net::Socket {
119 public:
120 explicit P2PDatagramSocketAdapter(scoped_ptr<P2PDatagramSocket> socket)
121 : socket_(socket.Pass()) {}
122 ~P2PDatagramSocketAdapter() override {}
123
124 int Read(net::IOBuffer* buf, int buf_len,
125 const net::CompletionCallback& callback) override {
126 return socket_->Recv(buf, buf_len, callback);
127 }
128 int Write(net::IOBuffer* buf, int buf_len,
129 const net::CompletionCallback& callback) override {
130 return socket_->Send(buf, buf_len, callback);
131 }
132
133 int SetReceiveBufferSize(int32_t size) override {
134 NOTREACHED();
135 return net::ERR_FAILED;
136 }
137
138 int SetSendBufferSize(int32_t size) override {
139 NOTREACHED();
140 return net::ERR_FAILED;
141 }
142
143 private:
144 scoped_ptr<P2PDatagramSocket> socket_;
145 };
146
147 } // namespace
148
149 class QuicChannelFactory::Core : public net::QuicP2PSession::Delegate {
150 public:
151 Core(const std::string& session_id, bool is_server);
152 virtual ~Core();
153
154 // Called from ~QuicChannelFactory() to synchronously release underlying
155 // socket. Core is destroyed later asynchronously.
156 void Close();
157
158 // Implementation of all all methods for QuicChannelFactory.
159 const std::string& CreateSessionInitiateConfigMessage();
160 bool ProcessSessionAcceptConfigMessage(const std::string& message);
161
162 bool ProcessSessionInitiateConfigMessage(const std::string& message);
163 const std::string& CreateSessionAcceptConfigMessage();
164
165 void Start(DatagramChannelFactory* factory, const std::string& shared_secret);
166
167 void CreateChannel(const std::string& name,
168 const ChannelCreatedCallback& callback);
169 void CancelChannelCreation(const std::string& name);
170
171 private:
172 friend class QuicChannelFactory;
173
174 struct PendingChannel {
175 PendingChannel(const std::string& name,
176 const ChannelCreatedCallback& callback)
177 : name(name), callback(callback) {}
178
179 std::string name;
180 ChannelCreatedCallback callback;
181 };
182
183 // QuicP2PSession::Delegate interface.
184 void OnIncomingStream(net::QuicP2PStream* stream) override;
185 void OnConnectionClosed(net::QuicErrorCode error) override;
186
187 void OnBaseChannelReady(scoped_ptr<P2PDatagramSocket> socket);
188
189 void OnNameReceived(QuicChannel* channel);
190
191 void OnChannelDestroyed(int stream_id);
192
193 std::string session_id_;
194 bool is_server_;
195 DatagramChannelFactory* base_channel_factory_ = nullptr;
196
197 net::QuicConfig quic_config_;
198 std::string shared_secret_;
199 std::string session_initiate_quic_config_message_;
200 std::string session_accept_quic_config_message_;
201
202 net::QuicClock quic_clock_;
203 net::QuicConnectionHelper quic_helper_;
204 scoped_ptr<net::QuicP2PSession> quic_session_;
205 bool connected_ = false;
206
207 std::vector<PendingChannel*> pending_channels_;
208 std::vector<QuicChannel*> unnamed_incoming_channels_;
209
210 base::WeakPtrFactory<Core> weak_factory_;
211
212 DISALLOW_COPY_AND_ASSIGN(Core);
213 };
214
215 QuicChannelFactory::Core::Core(const std::string& session_id, bool is_server)
216 : session_id_(session_id),
217 is_server_(is_server),
218 quic_helper_(base::ThreadTaskRunnerHandle::Get().get(),
219 &quic_clock_,
220 net::QuicRandom::GetInstance()),
221 weak_factory_(this) {
222 quic_config_.SetInitialSessionFlowControlWindowToSend(
223 kQuicSessionMaxRecvWindowSize);
224 quic_config_.SetInitialStreamFlowControlWindowToSend(
225 kQuicStreamMaxRecvWindowSize);
226 }
227
228 QuicChannelFactory::Core::~Core() {
229 }
230
231 void QuicChannelFactory::Core::Close() {
232 DCHECK(pending_channels_.empty());
233
234 // Cancel creation of the base channel if it hasn't finished.
235 if (base_channel_factory_)
236 base_channel_factory_->CancelChannelCreation(kQuicChannelName);
237
238 STLDeleteElements(&unnamed_incoming_channels_);
239
240 if (quic_session_ && quic_session_->connection()->connected())
241 quic_session_->connection()->CloseConnection(net::QUIC_NO_ERROR, false);
242
243 DCHECK(unnamed_incoming_channels_.empty());
244 }
245
246 void QuicChannelFactory::Core::Start(DatagramChannelFactory* factory,
247 const std::string& shared_secret) {
248 base_channel_factory_ = factory;
249 shared_secret_ = shared_secret;
250
251 base_channel_factory_->CreateChannel(
252 kQuicChannelName,
253 base::Bind(&Core::OnBaseChannelReady, weak_factory_.GetWeakPtr()));
254 }
255
256 const std::string&
257 QuicChannelFactory::Core::CreateSessionInitiateConfigMessage() {
258 DCHECK(!is_server_);
259
260 net::CryptoHandshakeMessage handshake_message;
261 handshake_message.set_tag(net::kCHLO);
262 quic_config_.ToHandshakeMessage(&handshake_message);
263
264 session_initiate_quic_config_message_ =
265 handshake_message.GetSerialized().AsStringPiece().as_string();
266 return session_initiate_quic_config_message_;
267 }
268
269 bool QuicChannelFactory::Core::ProcessSessionAcceptConfigMessage(
270 const std::string& message) {
271 DCHECK(!is_server_);
272
273 session_accept_quic_config_message_ = message;
274
275 scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
276 net::CryptoFramer::ParseMessage(message));
277 if (!parsed_message) {
278 LOG(ERROR) << "Received invalid QUIC config.";
279 return false;
280 }
281
282 if (parsed_message->tag() != net::kSHLO) {
283 LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
284 << parsed_message->tag();
285 return false;
286 }
287
288 std::string error_message;
289 net::QuicErrorCode error = quic_config_.ProcessPeerHello(
290 *parsed_message, net::SERVER, &error_message);
291 if (error != net::QUIC_NO_ERROR) {
292 LOG(ERROR) << "Failed to process QUIC handshake message: "
293 << error_message;
294 return false;
295 }
296
297 return true;
298 }
299
300 bool QuicChannelFactory::Core::ProcessSessionInitiateConfigMessage(
301 const std::string& message) {
302 DCHECK(is_server_);
303
304 session_initiate_quic_config_message_ = message;
305
306 scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
307 net::CryptoFramer::ParseMessage(message));
308 if (!parsed_message) {
309 LOG(ERROR) << "Received invalid QUIC config.";
310 return false;
311 }
312
313 if (parsed_message->tag() != net::kCHLO) {
314 LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
315 << parsed_message->tag();
316 return false;
317 }
318
319 std::string error_message;
320 net::QuicErrorCode error = quic_config_.ProcessPeerHello(
321 *parsed_message, net::CLIENT, &error_message);
322 if (error != net::QUIC_NO_ERROR) {
323 LOG(ERROR) << "Failed to process QUIC handshake message: "
324 << error_message;
325 return false;
326 }
327
328 return true;
329 }
330
331 const std::string&
332 QuicChannelFactory::Core::CreateSessionAcceptConfigMessage() {
333 DCHECK(is_server_);
334
335 if (session_initiate_quic_config_message_.empty()) {
336 // Don't send quic-config to the client if the client didn't include the
337 // config in the session-initiate message.
338 DCHECK(session_accept_quic_config_message_.empty());
339 return session_accept_quic_config_message_;
340 }
341
342 net::CryptoHandshakeMessage handshake_message;
343 handshake_message.set_tag(net::kSHLO);
344 quic_config_.ToHandshakeMessage(&handshake_message);
345
346 session_accept_quic_config_message_ =
347 handshake_message.GetSerialized().AsStringPiece().as_string();
348 return session_accept_quic_config_message_;
349 }
350
351 // StreamChannelFactory interface.
352 void QuicChannelFactory::Core::CreateChannel(
353 const std::string& name,
354 const ChannelCreatedCallback& callback) {
355 if (quic_session_ && quic_session_->connection()->connected()) {
356 if (!is_server_) {
357 net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
358 scoped_ptr<QuicChannel> channel(new QuicClientChannel(
359 stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
360 stream->id()),
361 name));
362 callback.Run(channel.Pass());
363 } else {
364 // On the server side wait for the client to create a QUIC stream and
365 // send the name. The channel will be connected in OnNameReceived().
366 pending_channels_.push_back(new PendingChannel(name, callback));
367 }
368 } else if (!base_channel_factory_) {
369 // Fail synchronously if we failed to connect transport.
370 callback.Run(nullptr);
371 } else {
372 // Still waiting for the transport.
373 pending_channels_.push_back(new PendingChannel(name, callback));
374 }
375 }
376
377 void QuicChannelFactory::Core::CancelChannelCreation(const std::string& name) {
378 for (auto it = pending_channels_.begin(); it != pending_channels_.end();
379 ++it) {
380 if ((*it)->name == name) {
381 pending_channels_.erase(it);
382 return;
383 }
384 }
385 }
386
387 void QuicChannelFactory::Core::OnBaseChannelReady(
388 scoped_ptr<P2PDatagramSocket> socket) {
389 base_channel_factory_ = nullptr;
390
391 // Failed to connect underlying transport connection. Fail all pending
392 // channel.
393 if (!socket) {
394 while (!pending_channels_.empty()) {
395 scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
396 pending_channels_.erase(pending_channels_.begin());
397 pending_channel->callback.Run(nullptr);
398 }
399 return;
400 }
401
402 QuicPacketWriterFactory writer_factory(socket.get());
403 net::IPAddressNumber ip(net::kIPv4AddressSize, 0);
404 scoped_ptr<net::QuicConnection> quic_connection(new net::QuicConnection(
405 0, net::IPEndPoint(ip, 0), &quic_helper_, writer_factory,
406 true /* owns_writer */,
407 is_server_ ? net::Perspective::IS_SERVER : net::Perspective::IS_CLIENT,
408 true /* is_secure */, net::QuicSupportedVersions()));
409
410 net::QuicP2PCryptoConfig quic_crypto_config(shared_secret_);
411 quic_crypto_config.set_hkdf_input_suffix(
412 session_id_ + "\0" + kQuicChannelName +
413 session_initiate_quic_config_message_ +
414 session_accept_quic_config_message_);
415
416 quic_session_.reset(new net::QuicP2PSession(
417 quic_config_, quic_crypto_config, quic_connection.Pass(),
418 make_scoped_ptr(new P2PDatagramSocketAdapter(socket.Pass()))));
419 quic_session_->SetDelegate(this);
420 quic_session_->Initialize();
421
422 if (!is_server_) {
423 // On the client create streams for all pending channels and send a name for
424 // each channel.
425 while (!pending_channels_.empty()) {
426 scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
427 pending_channels_.erase(pending_channels_.begin());
428
429 net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
430 scoped_ptr<QuicChannel> channel(new QuicClientChannel(
431 stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
432 stream->id()),
433 pending_channel->name));
434 pending_channel->callback.Run(channel.Pass());
435 }
436 }
437 }
438
439 void QuicChannelFactory::Core::OnIncomingStream(net::QuicP2PStream* stream) {
440 QuicServerChannel* channel = new QuicServerChannel(
441 stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
442 stream->id()));
443 unnamed_incoming_channels_.push_back(channel);
444 channel->ReceiveName(
445 base::Bind(&Core::OnNameReceived, base::Unretained(this), channel));
446 }
447
448 void QuicChannelFactory::Core::OnConnectionClosed(net::QuicErrorCode error) {
449 if (error != net::QUIC_NO_ERROR)
450 LOG(ERROR) << "QUIC connection was closed, error_code=" << error;
451
452 while (!pending_channels_.empty()) {
453 scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
454 pending_channels_.erase(pending_channels_.begin());
455 pending_channel->callback.Run(nullptr);
456 }
457 }
458
459 void QuicChannelFactory::Core::OnNameReceived(QuicChannel* channel) {
460 DCHECK(is_server_);
461
462 scoped_ptr<QuicChannel> owned_channel(channel);
463
464 auto it = std::find(unnamed_incoming_channels_.begin(),
465 unnamed_incoming_channels_.end(), channel);
466 DCHECK(it != unnamed_incoming_channels_.end());
467 unnamed_incoming_channels_.erase(it);
468
469 if (channel->name().empty()) {
470 // Failed to read a name for incoming channel.
471 return;
472 }
473
474 for (auto it = pending_channels_.begin();
475 it != pending_channels_.end(); ++it) {
476 if ((*it)->name == channel->name()) {
477 ChannelCreatedCallback callback = (*it)->callback;
478 pending_channels_.erase(it);
479 callback.Run(owned_channel.Pass());
480 return;
481 }
482 }
483
484 LOG(ERROR) << "Unexpected incoming channel: " << channel->name();
485 }
486
487 void QuicChannelFactory::Core::OnChannelDestroyed(int stream_id) {
488 if (quic_session_)
489 quic_session_->CloseStream(stream_id);
490 }
491
492 QuicChannelFactory::QuicChannelFactory(const std::string& session_id,
493 bool is_server)
494 : core_(new Core(session_id, is_server)) {}
495
496 QuicChannelFactory::~QuicChannelFactory() {
497 core_->Close();
498 base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE, core_.release());
499 }
500
501 const std::string& QuicChannelFactory::CreateSessionInitiateConfigMessage() {
502 return core_->CreateSessionInitiateConfigMessage();
503 }
504
505 bool QuicChannelFactory::ProcessSessionAcceptConfigMessage(
506 const std::string& message) {
507 return core_->ProcessSessionAcceptConfigMessage(message);
508 }
509
510 bool QuicChannelFactory::ProcessSessionInitiateConfigMessage(
511 const std::string& message) {
512 return core_->ProcessSessionInitiateConfigMessage(message);
513 }
514
515 const std::string& QuicChannelFactory::CreateSessionAcceptConfigMessage() {
516 return core_->CreateSessionAcceptConfigMessage();
517 }
518
519 void QuicChannelFactory::Start(DatagramChannelFactory* factory,
520 const std::string& shared_secret) {
521 core_->Start(factory, shared_secret);
522 }
523
524 void QuicChannelFactory::CreateChannel(const std::string& name,
525 const ChannelCreatedCallback& callback) {
526 core_->CreateChannel(name, callback);
527 }
528
529 void QuicChannelFactory::CancelChannelCreation(const std::string& name) {
530 core_->CancelChannelCreation(name);
531 }
532
533 net::QuicP2PSession* QuicChannelFactory::GetP2PSessionForTests() {
534 return core_->quic_session_.get();
535 }
536
537 } // namespace protocol
538 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698