| Index: remoting/protocol/quic_channel_factory.cc
|
| diff --git a/remoting/protocol/quic_channel_factory.cc b/remoting/protocol/quic_channel_factory.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..0c082c85e875627bcf449daa846462fe07af69a3
|
| --- /dev/null
|
| +++ b/remoting/protocol/quic_channel_factory.cc
|
| @@ -0,0 +1,538 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "remoting/protocol/quic_channel_factory.h"
|
| +
|
| +#include <vector>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/single_thread_task_runner.h"
|
| +#include "base/stl_util.h"
|
| +#include "base/thread_task_runner_handle.h"
|
| +#include "net/base/io_buffer.h"
|
| +#include "net/base/net_errors.h"
|
| +#include "net/quic/crypto/crypto_framer.h"
|
| +#include "net/quic/crypto/crypto_handshake_message.h"
|
| +#include "net/quic/crypto/crypto_protocol.h"
|
| +#include "net/quic/crypto/quic_random.h"
|
| +#include "net/quic/p2p/quic_p2p_crypto_config.h"
|
| +#include "net/quic/p2p/quic_p2p_session.h"
|
| +#include "net/quic/p2p/quic_p2p_stream.h"
|
| +#include "net/quic/quic_clock.h"
|
| +#include "net/quic/quic_connection_helper.h"
|
| +#include "net/quic/quic_default_packet_writer.h"
|
| +#include "net/socket/stream_socket.h"
|
| +#include "remoting/base/constants.h"
|
| +#include "remoting/protocol/datagram_channel_factory.h"
|
| +#include "remoting/protocol/p2p_datagram_socket.h"
|
| +#include "remoting/protocol/quic_channel.h"
|
| +
|
| +namespace remoting {
|
| +namespace protocol {
|
| +
|
| +namespace {
|
| +
|
| +// The maximum receive window sizes for QUIC sessions and streams. These are
|
| +// the same values that are used in chrome.
|
| +const int kQuicSessionMaxRecvWindowSize = 15 * 1024 * 1024; // 15 MB
|
| +const int kQuicStreamMaxRecvWindowSize = 6 * 1024 * 1024; // 6 MB
|
| +
|
| +class P2PQuicPacketWriter : public net::QuicPacketWriter {
|
| + public:
|
| + P2PQuicPacketWriter(net::QuicConnection* connection,
|
| + P2PDatagramSocket* socket)
|
| + : connection_(connection), socket_(socket), weak_factory_(this) {}
|
| + ~P2PQuicPacketWriter() override {}
|
| +
|
| + // QuicPacketWriter interface.
|
| + net::WriteResult WritePacket(const char* buffer,
|
| + size_t buf_len,
|
| + const net::IPAddressNumber& self_address,
|
| + const net::IPEndPoint& peer_address) override {
|
| + DCHECK(!write_blocked_);
|
| +
|
| + scoped_refptr<net::StringIOBuffer> buf(
|
| + new net::StringIOBuffer(std::string(buffer, buf_len)));
|
| + int result = socket_->Send(buf, buf_len,
|
| + base::Bind(&P2PQuicPacketWriter::OnSendComplete,
|
| + weak_factory_.GetWeakPtr()));
|
| + net::WriteStatus status = net::WRITE_STATUS_OK;
|
| + if (result < 0) {
|
| + if (result == net::ERR_IO_PENDING) {
|
| + status = net::WRITE_STATUS_BLOCKED;
|
| + write_blocked_ = true;
|
| + } else {
|
| + status = net::WRITE_STATUS_ERROR;
|
| + }
|
| + }
|
| +
|
| + return net::WriteResult(status, result);
|
| + }
|
| + bool IsWriteBlockedDataBuffered() const override {
|
| + // P2PDatagramSocket::Send() method buffer the data until the Send is
|
| + // unblocked.
|
| + return true;
|
| + }
|
| + bool IsWriteBlocked() const override { return write_blocked_; }
|
| + void SetWritable() override { write_blocked_ = false; }
|
| +
|
| + private:
|
| + void OnSendComplete(int result){
|
| + DCHECK_NE(result, net::ERR_IO_PENDING);
|
| + write_blocked_ = false;
|
| + if (result < 0) {
|
| + connection_->OnWriteError(result);
|
| + }
|
| + connection_->OnCanWrite();
|
| + }
|
| +
|
| + net::QuicConnection* connection_;
|
| + P2PDatagramSocket* socket_;
|
| +
|
| + // Whether a write is currently in flight.
|
| + bool write_blocked_ = false;
|
| +
|
| + base::WeakPtrFactory<P2PQuicPacketWriter> weak_factory_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(P2PQuicPacketWriter);
|
| +};
|
| +
|
| +class QuicPacketWriterFactory
|
| + : public net::QuicConnection::PacketWriterFactory {
|
| + public:
|
| + explicit QuicPacketWriterFactory(P2PDatagramSocket* socket)
|
| + : socket_(socket) {}
|
| + ~QuicPacketWriterFactory() override {}
|
| +
|
| + net::QuicPacketWriter* Create(
|
| + net::QuicConnection* connection) const override {
|
| + return new P2PQuicPacketWriter(connection, socket_);
|
| + }
|
| +
|
| + private:
|
| + P2PDatagramSocket* socket_;
|
| +};
|
| +
|
| +class P2PDatagramSocketAdapter : public net::Socket {
|
| + public:
|
| + explicit P2PDatagramSocketAdapter(scoped_ptr<P2PDatagramSocket> socket)
|
| + : socket_(socket.Pass()) {}
|
| + ~P2PDatagramSocketAdapter() override {}
|
| +
|
| + int Read(net::IOBuffer* buf, int buf_len,
|
| + const net::CompletionCallback& callback) override {
|
| + return socket_->Recv(buf, buf_len, callback);
|
| + }
|
| + int Write(net::IOBuffer* buf, int buf_len,
|
| + const net::CompletionCallback& callback) override {
|
| + return socket_->Send(buf, buf_len, callback);
|
| + }
|
| +
|
| + int SetReceiveBufferSize(int32_t size) override {
|
| + NOTREACHED();
|
| + return net::ERR_FAILED;
|
| + }
|
| +
|
| + int SetSendBufferSize(int32_t size) override {
|
| + NOTREACHED();
|
| + return net::ERR_FAILED;
|
| + }
|
| +
|
| + private:
|
| + scoped_ptr<P2PDatagramSocket> socket_;
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| +class QuicChannelFactory::Core : public net::QuicP2PSession::Delegate {
|
| + public:
|
| + Core(const std::string& session_id, bool is_server);
|
| + virtual ~Core();
|
| +
|
| + // Called from ~QuicChannelFactory() to synchronously release underlying
|
| + // socket. Core is destroyed later asynchronously.
|
| + void Close();
|
| +
|
| + // Implementation of all all methods for QuicChannelFactory.
|
| + const std::string& CreateSessionInitiateConfigMessage();
|
| + bool ProcessSessionAcceptConfigMessage(const std::string& message);
|
| +
|
| + bool ProcessSessionInitiateConfigMessage(const std::string& message);
|
| + const std::string& CreateSessionAcceptConfigMessage();
|
| +
|
| + void Start(DatagramChannelFactory* factory, const std::string& shared_secret);
|
| +
|
| + void CreateChannel(const std::string& name,
|
| + const ChannelCreatedCallback& callback);
|
| + void CancelChannelCreation(const std::string& name);
|
| +
|
| + private:
|
| + friend class QuicChannelFactory;
|
| +
|
| + struct PendingChannel {
|
| + PendingChannel(const std::string& name,
|
| + const ChannelCreatedCallback& callback)
|
| + : name(name), callback(callback) {}
|
| +
|
| + std::string name;
|
| + ChannelCreatedCallback callback;
|
| + };
|
| +
|
| + // QuicP2PSession::Delegate interface.
|
| + void OnIncomingStream(net::QuicP2PStream* stream) override;
|
| + void OnConnectionClosed(net::QuicErrorCode error) override;
|
| +
|
| + void OnBaseChannelReady(scoped_ptr<P2PDatagramSocket> socket);
|
| +
|
| + void OnNameReceived(QuicChannel* channel);
|
| +
|
| + void OnChannelDestroyed(int stream_id);
|
| +
|
| + std::string session_id_;
|
| + bool is_server_;
|
| + DatagramChannelFactory* base_channel_factory_ = nullptr;
|
| +
|
| + net::QuicConfig quic_config_;
|
| + std::string shared_secret_;
|
| + std::string session_initiate_quic_config_message_;
|
| + std::string session_accept_quic_config_message_;
|
| +
|
| + net::QuicClock quic_clock_;
|
| + net::QuicConnectionHelper quic_helper_;
|
| + scoped_ptr<net::QuicP2PSession> quic_session_;
|
| + bool connected_ = false;
|
| +
|
| + std::vector<PendingChannel*> pending_channels_;
|
| + std::vector<QuicChannel*> unnamed_incoming_channels_;
|
| +
|
| + base::WeakPtrFactory<Core> weak_factory_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(Core);
|
| +};
|
| +
|
| +QuicChannelFactory::Core::Core(const std::string& session_id, bool is_server)
|
| + : session_id_(session_id),
|
| + is_server_(is_server),
|
| + quic_helper_(base::ThreadTaskRunnerHandle::Get().get(),
|
| + &quic_clock_,
|
| + net::QuicRandom::GetInstance()),
|
| + weak_factory_(this) {
|
| + quic_config_.SetInitialSessionFlowControlWindowToSend(
|
| + kQuicSessionMaxRecvWindowSize);
|
| + quic_config_.SetInitialStreamFlowControlWindowToSend(
|
| + kQuicStreamMaxRecvWindowSize);
|
| +}
|
| +
|
| +QuicChannelFactory::Core::~Core() {
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::Close() {
|
| + DCHECK(pending_channels_.empty());
|
| +
|
| + // Cancel creation of the base channel if it hasn't finished.
|
| + if (base_channel_factory_)
|
| + base_channel_factory_->CancelChannelCreation(kQuicChannelName);
|
| +
|
| + STLDeleteElements(&unnamed_incoming_channels_);
|
| +
|
| + if (quic_session_ && quic_session_->connection()->connected())
|
| + quic_session_->connection()->CloseConnection(net::QUIC_NO_ERROR, false);
|
| +
|
| + DCHECK(unnamed_incoming_channels_.empty());
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::Start(DatagramChannelFactory* factory,
|
| + const std::string& shared_secret) {
|
| + base_channel_factory_ = factory;
|
| + shared_secret_ = shared_secret;
|
| +
|
| + base_channel_factory_->CreateChannel(
|
| + kQuicChannelName,
|
| + base::Bind(&Core::OnBaseChannelReady, weak_factory_.GetWeakPtr()));
|
| +}
|
| +
|
| +const std::string&
|
| +QuicChannelFactory::Core::CreateSessionInitiateConfigMessage() {
|
| + DCHECK(!is_server_);
|
| +
|
| + net::CryptoHandshakeMessage handshake_message;
|
| + handshake_message.set_tag(net::kCHLO);
|
| + quic_config_.ToHandshakeMessage(&handshake_message);
|
| +
|
| + session_initiate_quic_config_message_ =
|
| + handshake_message.GetSerialized().AsStringPiece().as_string();
|
| + return session_initiate_quic_config_message_;
|
| +}
|
| +
|
| +bool QuicChannelFactory::Core::ProcessSessionAcceptConfigMessage(
|
| + const std::string& message) {
|
| + DCHECK(!is_server_);
|
| +
|
| + session_accept_quic_config_message_ = message;
|
| +
|
| + scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
|
| + net::CryptoFramer::ParseMessage(message));
|
| + if (!parsed_message) {
|
| + LOG(ERROR) << "Received invalid QUIC config.";
|
| + return false;
|
| + }
|
| +
|
| + if (parsed_message->tag() != net::kSHLO) {
|
| + LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
|
| + << parsed_message->tag();
|
| + return false;
|
| + }
|
| +
|
| + std::string error_message;
|
| + net::QuicErrorCode error = quic_config_.ProcessPeerHello(
|
| + *parsed_message, net::SERVER, &error_message);
|
| + if (error != net::QUIC_NO_ERROR) {
|
| + LOG(ERROR) << "Failed to process QUIC handshake message: "
|
| + << error_message;
|
| + return false;
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| +bool QuicChannelFactory::Core::ProcessSessionInitiateConfigMessage(
|
| + const std::string& message) {
|
| + DCHECK(is_server_);
|
| +
|
| + session_initiate_quic_config_message_ = message;
|
| +
|
| + scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
|
| + net::CryptoFramer::ParseMessage(message));
|
| + if (!parsed_message) {
|
| + LOG(ERROR) << "Received invalid QUIC config.";
|
| + return false;
|
| + }
|
| +
|
| + if (parsed_message->tag() != net::kCHLO) {
|
| + LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
|
| + << parsed_message->tag();
|
| + return false;
|
| + }
|
| +
|
| + std::string error_message;
|
| + net::QuicErrorCode error = quic_config_.ProcessPeerHello(
|
| + *parsed_message, net::CLIENT, &error_message);
|
| + if (error != net::QUIC_NO_ERROR) {
|
| + LOG(ERROR) << "Failed to process QUIC handshake message: "
|
| + << error_message;
|
| + return false;
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| +const std::string&
|
| +QuicChannelFactory::Core::CreateSessionAcceptConfigMessage() {
|
| + DCHECK(is_server_);
|
| +
|
| + if (session_initiate_quic_config_message_.empty()) {
|
| + // Don't send quic-config to the client if the client didn't include the
|
| + // config in the session-initiate message.
|
| + DCHECK(session_accept_quic_config_message_.empty());
|
| + return session_accept_quic_config_message_;
|
| + }
|
| +
|
| + net::CryptoHandshakeMessage handshake_message;
|
| + handshake_message.set_tag(net::kSHLO);
|
| + quic_config_.ToHandshakeMessage(&handshake_message);
|
| +
|
| + session_accept_quic_config_message_ =
|
| + handshake_message.GetSerialized().AsStringPiece().as_string();
|
| + return session_accept_quic_config_message_;
|
| +}
|
| +
|
| +// StreamChannelFactory interface.
|
| +void QuicChannelFactory::Core::CreateChannel(
|
| + const std::string& name,
|
| + const ChannelCreatedCallback& callback) {
|
| + if (quic_session_ && quic_session_->connection()->connected()) {
|
| + if (!is_server_) {
|
| + net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
|
| + scoped_ptr<QuicChannel> channel(new QuicClientChannel(
|
| + stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
|
| + stream->id()),
|
| + name));
|
| + callback.Run(channel.Pass());
|
| + } else {
|
| + // On the server side wait for the client to create a QUIC stream and
|
| + // send the name. The channel will be connected in OnNameReceived().
|
| + pending_channels_.push_back(new PendingChannel(name, callback));
|
| + }
|
| + } else if (!base_channel_factory_) {
|
| + // Fail synchronously if we failed to connect transport.
|
| + callback.Run(nullptr);
|
| + } else {
|
| + // Still waiting for the transport.
|
| + pending_channels_.push_back(new PendingChannel(name, callback));
|
| + }
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::CancelChannelCreation(const std::string& name) {
|
| + for (auto it = pending_channels_.begin(); it != pending_channels_.end();
|
| + ++it) {
|
| + if ((*it)->name == name) {
|
| + pending_channels_.erase(it);
|
| + return;
|
| + }
|
| + }
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::OnBaseChannelReady(
|
| + scoped_ptr<P2PDatagramSocket> socket) {
|
| + base_channel_factory_ = nullptr;
|
| +
|
| + // Failed to connect underlying transport connection. Fail all pending
|
| + // channel.
|
| + if (!socket) {
|
| + while (!pending_channels_.empty()) {
|
| + scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
|
| + pending_channels_.erase(pending_channels_.begin());
|
| + pending_channel->callback.Run(nullptr);
|
| + }
|
| + return;
|
| + }
|
| +
|
| + QuicPacketWriterFactory writer_factory(socket.get());
|
| + net::IPAddressNumber ip(net::kIPv4AddressSize, 0);
|
| + scoped_ptr<net::QuicConnection> quic_connection(new net::QuicConnection(
|
| + 0, net::IPEndPoint(ip, 0), &quic_helper_, writer_factory,
|
| + true /* owns_writer */,
|
| + is_server_ ? net::Perspective::IS_SERVER : net::Perspective::IS_CLIENT,
|
| + true /* is_secure */, net::QuicSupportedVersions()));
|
| +
|
| + net::QuicP2PCryptoConfig quic_crypto_config(shared_secret_);
|
| + quic_crypto_config.set_hkdf_input_suffix(
|
| + session_id_ + "\0" + kQuicChannelName +
|
| + session_initiate_quic_config_message_ +
|
| + session_accept_quic_config_message_);
|
| +
|
| + quic_session_.reset(new net::QuicP2PSession(
|
| + quic_config_, quic_crypto_config, quic_connection.Pass(),
|
| + make_scoped_ptr(new P2PDatagramSocketAdapter(socket.Pass()))));
|
| + quic_session_->SetDelegate(this);
|
| + quic_session_->Initialize();
|
| +
|
| + if (!is_server_) {
|
| + // On the client create streams for all pending channels and send a name for
|
| + // each channel.
|
| + while (!pending_channels_.empty()) {
|
| + scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
|
| + pending_channels_.erase(pending_channels_.begin());
|
| +
|
| + net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
|
| + scoped_ptr<QuicChannel> channel(new QuicClientChannel(
|
| + stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
|
| + stream->id()),
|
| + pending_channel->name));
|
| + pending_channel->callback.Run(channel.Pass());
|
| + }
|
| + }
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::OnIncomingStream(net::QuicP2PStream* stream) {
|
| + QuicServerChannel* channel = new QuicServerChannel(
|
| + stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
|
| + stream->id()));
|
| + unnamed_incoming_channels_.push_back(channel);
|
| + channel->ReceiveName(
|
| + base::Bind(&Core::OnNameReceived, base::Unretained(this), channel));
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::OnConnectionClosed(net::QuicErrorCode error) {
|
| + if (error != net::QUIC_NO_ERROR)
|
| + LOG(ERROR) << "QUIC connection was closed, error_code=" << error;
|
| +
|
| + while (!pending_channels_.empty()) {
|
| + scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
|
| + pending_channels_.erase(pending_channels_.begin());
|
| + pending_channel->callback.Run(nullptr);
|
| + }
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::OnNameReceived(QuicChannel* channel) {
|
| + DCHECK(is_server_);
|
| +
|
| + scoped_ptr<QuicChannel> owned_channel(channel);
|
| +
|
| + auto it = std::find(unnamed_incoming_channels_.begin(),
|
| + unnamed_incoming_channels_.end(), channel);
|
| + DCHECK(it != unnamed_incoming_channels_.end());
|
| + unnamed_incoming_channels_.erase(it);
|
| +
|
| + if (channel->name().empty()) {
|
| + // Failed to read a name for incoming channel.
|
| + return;
|
| + }
|
| +
|
| + for (auto it = pending_channels_.begin();
|
| + it != pending_channels_.end(); ++it) {
|
| + if ((*it)->name == channel->name()) {
|
| + ChannelCreatedCallback callback = (*it)->callback;
|
| + pending_channels_.erase(it);
|
| + callback.Run(owned_channel.Pass());
|
| + return;
|
| + }
|
| + }
|
| +
|
| + LOG(ERROR) << "Unexpected incoming channel: " << channel->name();
|
| +}
|
| +
|
| +void QuicChannelFactory::Core::OnChannelDestroyed(int stream_id) {
|
| + if (quic_session_)
|
| + quic_session_->CloseStream(stream_id);
|
| +}
|
| +
|
| +QuicChannelFactory::QuicChannelFactory(const std::string& session_id,
|
| + bool is_server)
|
| + : core_(new Core(session_id, is_server)) {}
|
| +
|
| +QuicChannelFactory::~QuicChannelFactory() {
|
| + core_->Close();
|
| + base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE, core_.release());
|
| +}
|
| +
|
| +const std::string& QuicChannelFactory::CreateSessionInitiateConfigMessage() {
|
| + return core_->CreateSessionInitiateConfigMessage();
|
| +}
|
| +
|
| +bool QuicChannelFactory::ProcessSessionAcceptConfigMessage(
|
| + const std::string& message) {
|
| + return core_->ProcessSessionAcceptConfigMessage(message);
|
| +}
|
| +
|
| +bool QuicChannelFactory::ProcessSessionInitiateConfigMessage(
|
| + const std::string& message) {
|
| + return core_->ProcessSessionInitiateConfigMessage(message);
|
| +}
|
| +
|
| +const std::string& QuicChannelFactory::CreateSessionAcceptConfigMessage() {
|
| + return core_->CreateSessionAcceptConfigMessage();
|
| +}
|
| +
|
| +void QuicChannelFactory::Start(DatagramChannelFactory* factory,
|
| + const std::string& shared_secret) {
|
| + core_->Start(factory, shared_secret);
|
| +}
|
| +
|
| +void QuicChannelFactory::CreateChannel(const std::string& name,
|
| + const ChannelCreatedCallback& callback) {
|
| + core_->CreateChannel(name, callback);
|
| +}
|
| +
|
| +void QuicChannelFactory::CancelChannelCreation(const std::string& name) {
|
| + core_->CancelChannelCreation(name);
|
| +}
|
| +
|
| +net::QuicP2PSession* QuicChannelFactory::GetP2PSessionForTests() {
|
| + return core_->quic_session_.get();
|
| +}
|
| +
|
| +} // namespace protocol
|
| +} // namespace remoting
|
|
|