Index: remoting/protocol/quic_channel_factory.cc |
diff --git a/remoting/protocol/quic_channel_factory.cc b/remoting/protocol/quic_channel_factory.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..1046e877e7a3416a5f1428ee54a1fbbd17cabe0e |
--- /dev/null |
+++ b/remoting/protocol/quic_channel_factory.cc |
@@ -0,0 +1,536 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "remoting/protocol/quic_channel_factory.h" |
+ |
+#include <vector> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/single_thread_task_runner.h" |
+#include "base/stl_util.h" |
+#include "base/thread_task_runner_handle.h" |
+#include "net/base/io_buffer.h" |
+#include "net/base/net_errors.h" |
+#include "net/quic/crypto/crypto_framer.h" |
+#include "net/quic/crypto/crypto_handshake_message.h" |
+#include "net/quic/crypto/crypto_protocol.h" |
+#include "net/quic/crypto/quic_random.h" |
+#include "net/quic/p2p/quic_p2p_crypto_config.h" |
+#include "net/quic/p2p/quic_p2p_session.h" |
+#include "net/quic/p2p/quic_p2p_stream.h" |
+#include "net/quic/quic_clock.h" |
+#include "net/quic/quic_connection_helper.h" |
+#include "net/quic/quic_default_packet_writer.h" |
+#include "net/socket/stream_socket.h" |
+#include "remoting/base/constants.h" |
+#include "remoting/protocol/datagram_channel_factory.h" |
+#include "remoting/protocol/p2p_datagram_socket.h" |
+#include "remoting/protocol/quic_channel.h" |
+ |
+namespace remoting { |
+namespace protocol { |
+ |
+namespace { |
+ |
+// The maximum receive window sizes for QUIC sessions and streams. These are |
+// the same values that are used in chrome. |
+const int kQuicSessionMaxRecvWindowSize = 15 * 1024 * 1024; // 15 MB |
+const int kQuicStreamMaxRecvWindowSize = 6 * 1024 * 1024; // 6 MB |
+ |
+class P2PQuicPacketWriter : public net::QuicPacketWriter { |
+ public: |
+ P2PQuicPacketWriter(net::QuicConnection* connection, |
+ P2PDatagramSocket* socket) |
+ : connection_(connection), socket_(socket), weak_factory_(this) {} |
+ ~P2PQuicPacketWriter() override {} |
+ |
+ // QuicPacketWriter interface. |
+ net::WriteResult WritePacket(const char* buffer, |
+ size_t buf_len, |
+ const net::IPAddressNumber& self_address, |
+ const net::IPEndPoint& peer_address) override { |
+ DCHECK(!write_blocked_); |
+ |
+ scoped_refptr<net::StringIOBuffer> buf( |
+ new net::StringIOBuffer(std::string(buffer, buf_len))); |
+ int result = socket_->Send(buf, buf_len, |
+ base::Bind(&P2PQuicPacketWriter::OnSendComplete, |
+ weak_factory_.GetWeakPtr())); |
+ net::WriteStatus status = net::WRITE_STATUS_OK; |
+ if (result < 0) { |
+ if (result == net::ERR_IO_PENDING) { |
+ status = net::WRITE_STATUS_BLOCKED; |
+ write_blocked_ = true; |
+ } else { |
+ status = net::WRITE_STATUS_ERROR; |
+ } |
+ } |
+ |
+ return net::WriteResult(status, result); |
+ } |
+ bool IsWriteBlockedDataBuffered() const override { |
+ // P2PDatagramSocket::Send() method buffer the data until the Send is |
+ // unblocked. |
+ return true; |
+ } |
+ bool IsWriteBlocked() const override { return write_blocked_; } |
+ void SetWritable() override { write_blocked_ = false; } |
+ |
+ private: |
+ void OnSendComplete(int result){ |
+ DCHECK_NE(result, net::ERR_IO_PENDING); |
+ write_blocked_ = false; |
+ if (result < 0) { |
+ connection_->OnWriteError(result); |
+ } |
+ connection_->OnCanWrite(); |
+ } |
+ |
+ net::QuicConnection* connection_; |
+ P2PDatagramSocket* socket_; |
+ |
+ // Whether a write is currently in flight. |
+ bool write_blocked_ = false; |
+ |
+ base::WeakPtrFactory<P2PQuicPacketWriter> weak_factory_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(P2PQuicPacketWriter); |
+}; |
+ |
+class QuicPacketWriterFactory |
+ : public net::QuicConnection::PacketWriterFactory { |
+ public: |
+ explicit QuicPacketWriterFactory(P2PDatagramSocket* socket) |
+ : socket_(socket) {} |
+ ~QuicPacketWriterFactory() override {} |
+ |
+ net::QuicPacketWriter* Create( |
+ net::QuicConnection* connection) const override { |
+ return new P2PQuicPacketWriter(connection, socket_); |
+ } |
+ |
+ private: |
+ P2PDatagramSocket* socket_; |
+}; |
+ |
+class P2PDatagramSocketAdapter : public net::Socket { |
+ public: |
+ explicit P2PDatagramSocketAdapter(scoped_ptr<P2PDatagramSocket> socket) |
+ : socket_(socket.Pass()) {} |
+ ~P2PDatagramSocketAdapter() override {} |
+ |
+ int Read(net::IOBuffer* buf, int buf_len, |
+ const net::CompletionCallback& callback) override { |
+ return socket_->Recv(buf, buf_len, callback); |
+ } |
+ int Write(net::IOBuffer* buf, int buf_len, |
+ const net::CompletionCallback& callback) override { |
+ return socket_->Send(buf, buf_len, callback); |
+ } |
+ |
+ int SetReceiveBufferSize(int32_t size) override { |
+ NOTREACHED(); |
+ return net::ERR_FAILED; |
+ } |
+ |
+ int SetSendBufferSize(int32_t size) override { |
+ NOTREACHED(); |
+ return net::ERR_FAILED; |
+ } |
+ |
+ private: |
+ scoped_ptr<P2PDatagramSocket> socket_; |
+}; |
+ |
+} // namespace |
+ |
+class QuicChannelFactory::Core : public net::QuicP2PSession::Delegate { |
+ public: |
+ Core(const std::string& session_id, bool is_server); |
+ virtual ~Core(); |
+ |
+ // Called from ~QuicChannelFactory() to synchronously release underlying |
+ // socket. Core is destroyed later asynchronously. |
+ void Close(); |
+ |
+ // Implementation of all all methods for QuicChannelFactory. |
+ const std::string& CreateSessionInitiateConfigMessage(); |
+ bool ProcessSessionAcceptConfigMessage(const std::string& message); |
+ |
+ bool ProcessSessionInitiateConfigMessage(const std::string& message); |
+ const std::string& CreateSessionAcceptConfigMessage(); |
+ |
+ void Start(DatagramChannelFactory* factory, const std::string& shared_secret); |
+ |
+ void CreateChannel(const std::string& name, |
+ const ChannelCreatedCallback& callback); |
+ void CancelChannelCreation(const std::string& name); |
+ |
+ private: |
+ friend class QuicChannelFactory; |
+ |
+ struct PendingChannel { |
+ PendingChannel(const std::string& name, |
+ const ChannelCreatedCallback& callback) |
+ : name(name), callback(callback) {} |
+ |
+ std::string name; |
+ ChannelCreatedCallback callback; |
+ }; |
+ |
+ // QuicP2PSession::Delegate interface. |
+ void OnIncomingStream(net::QuicP2PStream* stream) override; |
+ void OnConnectionClosed(net::QuicErrorCode error) override; |
+ |
+ void OnBaseChannelReady(scoped_ptr<P2PDatagramSocket> socket); |
+ |
+ void OnNameReceived(QuicChannel* channel); |
+ |
+ void OnChannelDestroyed(int stream_id); |
+ |
+ std::string session_id_; |
+ bool is_server_; |
+ DatagramChannelFactory* base_channel_factory_ = nullptr; |
+ |
+ net::QuicConfig quic_config_; |
+ std::string shared_secret_; |
+ std::string session_initiate_quic_config_message_; |
+ std::string session_accept_quic_config_message_; |
+ |
+ net::QuicClock quic_clock_; |
+ net::QuicConnectionHelper quic_helper_; |
+ scoped_ptr<net::QuicP2PSession> quic_session_; |
+ bool connected_ = false; |
+ |
+ std::vector<PendingChannel*> pending_channels_; |
+ std::vector<QuicChannel*> unnamed_incoming_channels_; |
+ |
+ base::WeakPtrFactory<Core> weak_factory_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(Core); |
+}; |
+ |
+QuicChannelFactory::Core::Core(const std::string& session_id, bool is_server) |
+ : session_id_(session_id), |
+ is_server_(is_server), |
+ quic_helper_(base::ThreadTaskRunnerHandle::Get().get(), |
+ &quic_clock_, |
+ net::QuicRandom::GetInstance()), |
+ weak_factory_(this) { |
+ quic_config_.SetInitialSessionFlowControlWindowToSend( |
+ kQuicSessionMaxRecvWindowSize); |
+ quic_config_.SetInitialStreamFlowControlWindowToSend( |
+ kQuicStreamMaxRecvWindowSize); |
+} |
+ |
+QuicChannelFactory::Core::~Core() {} |
+ |
+void QuicChannelFactory::Core::Close() { |
+ DCHECK(pending_channels_.empty()); |
+ |
+ // Cancel creation of the base channel if it hasn't finished. |
+ if (base_channel_factory_) |
+ base_channel_factory_->CancelChannelCreation(kQuicChannelName); |
+ |
+ if (quic_session_ && quic_session_->connection()->connected()) |
+ quic_session_->connection()->CloseConnection(net::QUIC_NO_ERROR, false); |
+ |
+ DCHECK(unnamed_incoming_channels_.empty()); |
+} |
+ |
+void QuicChannelFactory::Core::Start(DatagramChannelFactory* factory, |
+ const std::string& shared_secret) { |
+ base_channel_factory_ = factory; |
+ shared_secret_ = shared_secret; |
+ |
+ base_channel_factory_->CreateChannel( |
+ kQuicChannelName, |
+ base::Bind(&Core::OnBaseChannelReady, weak_factory_.GetWeakPtr())); |
+} |
+ |
+const std::string& |
+QuicChannelFactory::Core::CreateSessionInitiateConfigMessage() { |
+ DCHECK(!is_server_); |
+ |
+ net::CryptoHandshakeMessage handshake_message; |
+ handshake_message.set_tag(net::kCHLO); |
+ quic_config_.ToHandshakeMessage(&handshake_message); |
+ |
+ session_initiate_quic_config_message_ = |
+ handshake_message.GetSerialized().AsStringPiece().as_string(); |
+ return session_initiate_quic_config_message_; |
+} |
+ |
+bool QuicChannelFactory::Core::ProcessSessionAcceptConfigMessage( |
+ const std::string& message) { |
+ DCHECK(!is_server_); |
+ |
+ session_accept_quic_config_message_ = message; |
+ |
+ scoped_ptr<net::CryptoHandshakeMessage> parsed_message( |
+ net::CryptoFramer::ParseMessage(message)); |
+ if (!parsed_message) { |
+ LOG(ERROR) << "Received invalid QUIC config."; |
+ return false; |
+ } |
+ |
+ if (parsed_message->tag() != net::kSHLO) { |
+ LOG(ERROR) << "Received QUIC handshake message with unexpected tag " |
+ << parsed_message->tag(); |
+ return false; |
+ } |
+ |
+ std::string error_message; |
+ net::QuicErrorCode error = quic_config_.ProcessPeerHello( |
+ *parsed_message, net::SERVER, &error_message); |
+ if (error != net::QUIC_NO_ERROR) { |
+ LOG(ERROR) << "Failed to process QUIC handshake message: " |
+ << error_message; |
+ return false; |
+ } |
+ |
+ return true; |
+} |
+ |
+bool QuicChannelFactory::Core::ProcessSessionInitiateConfigMessage( |
+ const std::string& message) { |
+ DCHECK(is_server_); |
+ |
+ session_initiate_quic_config_message_ = message; |
+ |
+ scoped_ptr<net::CryptoHandshakeMessage> parsed_message( |
+ net::CryptoFramer::ParseMessage(message)); |
+ if (!parsed_message) { |
+ LOG(ERROR) << "Received invalid QUIC config."; |
+ return false; |
+ } |
+ |
+ if (parsed_message->tag() != net::kCHLO) { |
+ LOG(ERROR) << "Received QUIC handshake message with unexpected tag " |
+ << parsed_message->tag(); |
+ return false; |
+ } |
+ |
+ std::string error_message; |
+ net::QuicErrorCode error = quic_config_.ProcessPeerHello( |
+ *parsed_message, net::CLIENT, &error_message); |
+ if (error != net::QUIC_NO_ERROR) { |
+ LOG(ERROR) << "Failed to process QUIC handshake message: " |
+ << error_message; |
+ return false; |
+ } |
+ |
+ return true; |
+} |
+ |
+const std::string& |
+QuicChannelFactory::Core::CreateSessionAcceptConfigMessage() { |
+ DCHECK(is_server_); |
+ |
+ if (session_initiate_quic_config_message_.empty()) { |
+ // Don't send quic-config to the client if the client didn't include the |
+ // config in the session-initiate message. |
+ DCHECK(session_accept_quic_config_message_.empty()); |
+ return session_accept_quic_config_message_; |
+ } |
+ |
+ net::CryptoHandshakeMessage handshake_message; |
+ handshake_message.set_tag(net::kSHLO); |
+ quic_config_.ToHandshakeMessage(&handshake_message); |
+ |
+ session_accept_quic_config_message_ = |
+ handshake_message.GetSerialized().AsStringPiece().as_string(); |
+ return session_accept_quic_config_message_; |
+} |
+ |
+// StreamChannelFactory interface. |
+void QuicChannelFactory::Core::CreateChannel( |
+ const std::string& name, |
+ const ChannelCreatedCallback& callback) { |
+ if (quic_session_ && quic_session_->connection()->connected()) { |
+ if (!is_server_) { |
+ net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream(); |
+ scoped_ptr<QuicChannel> channel(new QuicClientChannel( |
+ stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this), |
+ stream->id()), |
+ name)); |
+ callback.Run(channel.Pass()); |
+ } else { |
+ // On the server side wait for the client to create a QUIC stream and |
+ // send the name. The channel will be connected in OnNameReceived(). |
+ pending_channels_.push_back(new PendingChannel(name, callback)); |
+ } |
+ } else if (!base_channel_factory_) { |
+ // Fail synchronously if we failed to connect transport. |
+ callback.Run(nullptr); |
+ } else { |
+ // Still waiting for the transport. |
+ pending_channels_.push_back(new PendingChannel(name, callback)); |
+ } |
+} |
+ |
+void QuicChannelFactory::Core::CancelChannelCreation(const std::string& name) { |
+ for (auto it = pending_channels_.begin(); it != pending_channels_.end(); |
+ ++it) { |
+ if ((*it)->name == name) { |
+ delete *it; |
+ pending_channels_.erase(it); |
+ return; |
+ } |
+ } |
+} |
+ |
+void QuicChannelFactory::Core::OnBaseChannelReady( |
+ scoped_ptr<P2PDatagramSocket> socket) { |
+ base_channel_factory_ = nullptr; |
+ |
+ // Failed to connect underlying transport connection. Fail all pending |
+ // channel. |
+ if (!socket) { |
+ while (!pending_channels_.empty()) { |
+ scoped_ptr<PendingChannel> pending_channel(pending_channels_.front()); |
+ pending_channels_.erase(pending_channels_.begin()); |
+ pending_channel->callback.Run(nullptr); |
+ } |
+ return; |
+ } |
+ |
+ QuicPacketWriterFactory writer_factory(socket.get()); |
+ net::IPAddressNumber ip(net::kIPv4AddressSize, 0); |
+ scoped_ptr<net::QuicConnection> quic_connection(new net::QuicConnection( |
+ 0, net::IPEndPoint(ip, 0), &quic_helper_, writer_factory, |
+ true /* owns_writer */, |
+ is_server_ ? net::Perspective::IS_SERVER : net::Perspective::IS_CLIENT, |
+ true /* is_secure */, net::QuicSupportedVersions())); |
+ |
+ net::QuicP2PCryptoConfig quic_crypto_config(shared_secret_); |
+ quic_crypto_config.set_hkdf_input_suffix( |
+ session_id_ + "\0" + kQuicChannelName + |
+ session_initiate_quic_config_message_ + |
+ session_accept_quic_config_message_); |
+ |
+ quic_session_.reset(new net::QuicP2PSession( |
+ quic_config_, quic_crypto_config, quic_connection.Pass(), |
+ make_scoped_ptr(new P2PDatagramSocketAdapter(socket.Pass())))); |
+ quic_session_->SetDelegate(this); |
+ quic_session_->Initialize(); |
+ |
+ if (!is_server_) { |
+ // On the client create streams for all pending channels and send a name for |
+ // each channel. |
+ while (!pending_channels_.empty()) { |
+ scoped_ptr<PendingChannel> pending_channel(pending_channels_.front()); |
+ pending_channels_.erase(pending_channels_.begin()); |
+ |
+ net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream(); |
+ scoped_ptr<QuicChannel> channel(new QuicClientChannel( |
+ stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this), |
+ stream->id()), |
+ pending_channel->name)); |
+ pending_channel->callback.Run(channel.Pass()); |
+ } |
+ } |
+} |
+ |
+void QuicChannelFactory::Core::OnIncomingStream(net::QuicP2PStream* stream) { |
+ QuicServerChannel* channel = new QuicServerChannel( |
+ stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this), |
+ stream->id())); |
+ unnamed_incoming_channels_.push_back(channel); |
+ channel->ReceiveName( |
+ base::Bind(&Core::OnNameReceived, base::Unretained(this), channel)); |
+} |
+ |
+void QuicChannelFactory::Core::OnConnectionClosed(net::QuicErrorCode error) { |
+ if (error != net::QUIC_NO_ERROR) |
+ LOG(ERROR) << "QUIC connection was closed, error_code=" << error; |
+ |
+ while (!pending_channels_.empty()) { |
+ scoped_ptr<PendingChannel> pending_channel(pending_channels_.front()); |
+ pending_channels_.erase(pending_channels_.begin()); |
+ pending_channel->callback.Run(nullptr); |
+ } |
+} |
+ |
+void QuicChannelFactory::Core::OnNameReceived(QuicChannel* channel) { |
+ DCHECK(is_server_); |
+ |
+ scoped_ptr<QuicChannel> owned_channel(channel); |
+ |
+ auto it = std::find(unnamed_incoming_channels_.begin(), |
+ unnamed_incoming_channels_.end(), channel); |
+ DCHECK(it != unnamed_incoming_channels_.end()); |
+ unnamed_incoming_channels_.erase(it); |
+ |
+ if (channel->name().empty()) { |
+ // Failed to read a name for incoming channel. |
+ return; |
+ } |
+ |
+ for (auto it = pending_channels_.begin(); |
+ it != pending_channels_.end(); ++it) { |
+ if ((*it)->name == channel->name()) { |
+ scoped_ptr<PendingChannel> pending_channel(*it); |
+ pending_channels_.erase(it); |
+ pending_channel->callback.Run(owned_channel.Pass()); |
+ return; |
+ } |
+ } |
+ |
+ LOG(ERROR) << "Unexpected incoming channel: " << channel->name(); |
+} |
+ |
+void QuicChannelFactory::Core::OnChannelDestroyed(int stream_id) { |
+ if (quic_session_) |
+ quic_session_->CloseStream(stream_id); |
+} |
+ |
+QuicChannelFactory::QuicChannelFactory(const std::string& session_id, |
+ bool is_server) |
+ : core_(new Core(session_id, is_server)) {} |
+ |
+QuicChannelFactory::~QuicChannelFactory() { |
+ core_->Close(); |
+ base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE, core_.release()); |
+} |
+ |
+const std::string& QuicChannelFactory::CreateSessionInitiateConfigMessage() { |
+ return core_->CreateSessionInitiateConfigMessage(); |
+} |
+ |
+bool QuicChannelFactory::ProcessSessionAcceptConfigMessage( |
+ const std::string& message) { |
+ return core_->ProcessSessionAcceptConfigMessage(message); |
+} |
+ |
+bool QuicChannelFactory::ProcessSessionInitiateConfigMessage( |
+ const std::string& message) { |
+ return core_->ProcessSessionInitiateConfigMessage(message); |
+} |
+ |
+const std::string& QuicChannelFactory::CreateSessionAcceptConfigMessage() { |
+ return core_->CreateSessionAcceptConfigMessage(); |
+} |
+ |
+void QuicChannelFactory::Start(DatagramChannelFactory* factory, |
+ const std::string& shared_secret) { |
+ core_->Start(factory, shared_secret); |
+} |
+ |
+void QuicChannelFactory::CreateChannel(const std::string& name, |
+ const ChannelCreatedCallback& callback) { |
+ core_->CreateChannel(name, callback); |
+} |
+ |
+void QuicChannelFactory::CancelChannelCreation(const std::string& name) { |
+ core_->CancelChannelCreation(name); |
+} |
+ |
+net::QuicP2PSession* QuicChannelFactory::GetP2PSessionForTests() { |
+ return core_->quic_session_.get(); |
+} |
+ |
+} // namespace protocol |
+} // namespace remoting |