Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(315)

Unified Diff: remoting/protocol/quic_channel_factory.cc

Issue 1273233002: Implement QuicChannel and QuicChannelFactory (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « remoting/protocol/quic_channel_factory.h ('k') | remoting/protocol/quic_channel_factory_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: remoting/protocol/quic_channel_factory.cc
diff --git a/remoting/protocol/quic_channel_factory.cc b/remoting/protocol/quic_channel_factory.cc
new file mode 100644
index 0000000000000000000000000000000000000000..1046e877e7a3416a5f1428ee54a1fbbd17cabe0e
--- /dev/null
+++ b/remoting/protocol/quic_channel_factory.cc
@@ -0,0 +1,536 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/quic_channel_factory.h"
+
+#include <vector>
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/single_thread_task_runner.h"
+#include "base/stl_util.h"
+#include "base/thread_task_runner_handle.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/quic/crypto/crypto_framer.h"
+#include "net/quic/crypto/crypto_handshake_message.h"
+#include "net/quic/crypto/crypto_protocol.h"
+#include "net/quic/crypto/quic_random.h"
+#include "net/quic/p2p/quic_p2p_crypto_config.h"
+#include "net/quic/p2p/quic_p2p_session.h"
+#include "net/quic/p2p/quic_p2p_stream.h"
+#include "net/quic/quic_clock.h"
+#include "net/quic/quic_connection_helper.h"
+#include "net/quic/quic_default_packet_writer.h"
+#include "net/socket/stream_socket.h"
+#include "remoting/base/constants.h"
+#include "remoting/protocol/datagram_channel_factory.h"
+#include "remoting/protocol/p2p_datagram_socket.h"
+#include "remoting/protocol/quic_channel.h"
+
+namespace remoting {
+namespace protocol {
+
+namespace {
+
+// The maximum receive window sizes for QUIC sessions and streams. These are
+// the same values that are used in chrome.
+const int kQuicSessionMaxRecvWindowSize = 15 * 1024 * 1024; // 15 MB
+const int kQuicStreamMaxRecvWindowSize = 6 * 1024 * 1024; // 6 MB
+
+class P2PQuicPacketWriter : public net::QuicPacketWriter {
+ public:
+ P2PQuicPacketWriter(net::QuicConnection* connection,
+ P2PDatagramSocket* socket)
+ : connection_(connection), socket_(socket), weak_factory_(this) {}
+ ~P2PQuicPacketWriter() override {}
+
+ // QuicPacketWriter interface.
+ net::WriteResult WritePacket(const char* buffer,
+ size_t buf_len,
+ const net::IPAddressNumber& self_address,
+ const net::IPEndPoint& peer_address) override {
+ DCHECK(!write_blocked_);
+
+ scoped_refptr<net::StringIOBuffer> buf(
+ new net::StringIOBuffer(std::string(buffer, buf_len)));
+ int result = socket_->Send(buf, buf_len,
+ base::Bind(&P2PQuicPacketWriter::OnSendComplete,
+ weak_factory_.GetWeakPtr()));
+ net::WriteStatus status = net::WRITE_STATUS_OK;
+ if (result < 0) {
+ if (result == net::ERR_IO_PENDING) {
+ status = net::WRITE_STATUS_BLOCKED;
+ write_blocked_ = true;
+ } else {
+ status = net::WRITE_STATUS_ERROR;
+ }
+ }
+
+ return net::WriteResult(status, result);
+ }
+ bool IsWriteBlockedDataBuffered() const override {
+ // P2PDatagramSocket::Send() method buffer the data until the Send is
+ // unblocked.
+ return true;
+ }
+ bool IsWriteBlocked() const override { return write_blocked_; }
+ void SetWritable() override { write_blocked_ = false; }
+
+ private:
+ void OnSendComplete(int result){
+ DCHECK_NE(result, net::ERR_IO_PENDING);
+ write_blocked_ = false;
+ if (result < 0) {
+ connection_->OnWriteError(result);
+ }
+ connection_->OnCanWrite();
+ }
+
+ net::QuicConnection* connection_;
+ P2PDatagramSocket* socket_;
+
+ // Whether a write is currently in flight.
+ bool write_blocked_ = false;
+
+ base::WeakPtrFactory<P2PQuicPacketWriter> weak_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(P2PQuicPacketWriter);
+};
+
+class QuicPacketWriterFactory
+ : public net::QuicConnection::PacketWriterFactory {
+ public:
+ explicit QuicPacketWriterFactory(P2PDatagramSocket* socket)
+ : socket_(socket) {}
+ ~QuicPacketWriterFactory() override {}
+
+ net::QuicPacketWriter* Create(
+ net::QuicConnection* connection) const override {
+ return new P2PQuicPacketWriter(connection, socket_);
+ }
+
+ private:
+ P2PDatagramSocket* socket_;
+};
+
+class P2PDatagramSocketAdapter : public net::Socket {
+ public:
+ explicit P2PDatagramSocketAdapter(scoped_ptr<P2PDatagramSocket> socket)
+ : socket_(socket.Pass()) {}
+ ~P2PDatagramSocketAdapter() override {}
+
+ int Read(net::IOBuffer* buf, int buf_len,
+ const net::CompletionCallback& callback) override {
+ return socket_->Recv(buf, buf_len, callback);
+ }
+ int Write(net::IOBuffer* buf, int buf_len,
+ const net::CompletionCallback& callback) override {
+ return socket_->Send(buf, buf_len, callback);
+ }
+
+ int SetReceiveBufferSize(int32_t size) override {
+ NOTREACHED();
+ return net::ERR_FAILED;
+ }
+
+ int SetSendBufferSize(int32_t size) override {
+ NOTREACHED();
+ return net::ERR_FAILED;
+ }
+
+ private:
+ scoped_ptr<P2PDatagramSocket> socket_;
+};
+
+} // namespace
+
+class QuicChannelFactory::Core : public net::QuicP2PSession::Delegate {
+ public:
+ Core(const std::string& session_id, bool is_server);
+ virtual ~Core();
+
+ // Called from ~QuicChannelFactory() to synchronously release underlying
+ // socket. Core is destroyed later asynchronously.
+ void Close();
+
+ // Implementation of all all methods for QuicChannelFactory.
+ const std::string& CreateSessionInitiateConfigMessage();
+ bool ProcessSessionAcceptConfigMessage(const std::string& message);
+
+ bool ProcessSessionInitiateConfigMessage(const std::string& message);
+ const std::string& CreateSessionAcceptConfigMessage();
+
+ void Start(DatagramChannelFactory* factory, const std::string& shared_secret);
+
+ void CreateChannel(const std::string& name,
+ const ChannelCreatedCallback& callback);
+ void CancelChannelCreation(const std::string& name);
+
+ private:
+ friend class QuicChannelFactory;
+
+ struct PendingChannel {
+ PendingChannel(const std::string& name,
+ const ChannelCreatedCallback& callback)
+ : name(name), callback(callback) {}
+
+ std::string name;
+ ChannelCreatedCallback callback;
+ };
+
+ // QuicP2PSession::Delegate interface.
+ void OnIncomingStream(net::QuicP2PStream* stream) override;
+ void OnConnectionClosed(net::QuicErrorCode error) override;
+
+ void OnBaseChannelReady(scoped_ptr<P2PDatagramSocket> socket);
+
+ void OnNameReceived(QuicChannel* channel);
+
+ void OnChannelDestroyed(int stream_id);
+
+ std::string session_id_;
+ bool is_server_;
+ DatagramChannelFactory* base_channel_factory_ = nullptr;
+
+ net::QuicConfig quic_config_;
+ std::string shared_secret_;
+ std::string session_initiate_quic_config_message_;
+ std::string session_accept_quic_config_message_;
+
+ net::QuicClock quic_clock_;
+ net::QuicConnectionHelper quic_helper_;
+ scoped_ptr<net::QuicP2PSession> quic_session_;
+ bool connected_ = false;
+
+ std::vector<PendingChannel*> pending_channels_;
+ std::vector<QuicChannel*> unnamed_incoming_channels_;
+
+ base::WeakPtrFactory<Core> weak_factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(Core);
+};
+
+QuicChannelFactory::Core::Core(const std::string& session_id, bool is_server)
+ : session_id_(session_id),
+ is_server_(is_server),
+ quic_helper_(base::ThreadTaskRunnerHandle::Get().get(),
+ &quic_clock_,
+ net::QuicRandom::GetInstance()),
+ weak_factory_(this) {
+ quic_config_.SetInitialSessionFlowControlWindowToSend(
+ kQuicSessionMaxRecvWindowSize);
+ quic_config_.SetInitialStreamFlowControlWindowToSend(
+ kQuicStreamMaxRecvWindowSize);
+}
+
+QuicChannelFactory::Core::~Core() {}
+
+void QuicChannelFactory::Core::Close() {
+ DCHECK(pending_channels_.empty());
+
+ // Cancel creation of the base channel if it hasn't finished.
+ if (base_channel_factory_)
+ base_channel_factory_->CancelChannelCreation(kQuicChannelName);
+
+ if (quic_session_ && quic_session_->connection()->connected())
+ quic_session_->connection()->CloseConnection(net::QUIC_NO_ERROR, false);
+
+ DCHECK(unnamed_incoming_channels_.empty());
+}
+
+void QuicChannelFactory::Core::Start(DatagramChannelFactory* factory,
+ const std::string& shared_secret) {
+ base_channel_factory_ = factory;
+ shared_secret_ = shared_secret;
+
+ base_channel_factory_->CreateChannel(
+ kQuicChannelName,
+ base::Bind(&Core::OnBaseChannelReady, weak_factory_.GetWeakPtr()));
+}
+
+const std::string&
+QuicChannelFactory::Core::CreateSessionInitiateConfigMessage() {
+ DCHECK(!is_server_);
+
+ net::CryptoHandshakeMessage handshake_message;
+ handshake_message.set_tag(net::kCHLO);
+ quic_config_.ToHandshakeMessage(&handshake_message);
+
+ session_initiate_quic_config_message_ =
+ handshake_message.GetSerialized().AsStringPiece().as_string();
+ return session_initiate_quic_config_message_;
+}
+
+bool QuicChannelFactory::Core::ProcessSessionAcceptConfigMessage(
+ const std::string& message) {
+ DCHECK(!is_server_);
+
+ session_accept_quic_config_message_ = message;
+
+ scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
+ net::CryptoFramer::ParseMessage(message));
+ if (!parsed_message) {
+ LOG(ERROR) << "Received invalid QUIC config.";
+ return false;
+ }
+
+ if (parsed_message->tag() != net::kSHLO) {
+ LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
+ << parsed_message->tag();
+ return false;
+ }
+
+ std::string error_message;
+ net::QuicErrorCode error = quic_config_.ProcessPeerHello(
+ *parsed_message, net::SERVER, &error_message);
+ if (error != net::QUIC_NO_ERROR) {
+ LOG(ERROR) << "Failed to process QUIC handshake message: "
+ << error_message;
+ return false;
+ }
+
+ return true;
+}
+
+bool QuicChannelFactory::Core::ProcessSessionInitiateConfigMessage(
+ const std::string& message) {
+ DCHECK(is_server_);
+
+ session_initiate_quic_config_message_ = message;
+
+ scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
+ net::CryptoFramer::ParseMessage(message));
+ if (!parsed_message) {
+ LOG(ERROR) << "Received invalid QUIC config.";
+ return false;
+ }
+
+ if (parsed_message->tag() != net::kCHLO) {
+ LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
+ << parsed_message->tag();
+ return false;
+ }
+
+ std::string error_message;
+ net::QuicErrorCode error = quic_config_.ProcessPeerHello(
+ *parsed_message, net::CLIENT, &error_message);
+ if (error != net::QUIC_NO_ERROR) {
+ LOG(ERROR) << "Failed to process QUIC handshake message: "
+ << error_message;
+ return false;
+ }
+
+ return true;
+}
+
+const std::string&
+QuicChannelFactory::Core::CreateSessionAcceptConfigMessage() {
+ DCHECK(is_server_);
+
+ if (session_initiate_quic_config_message_.empty()) {
+ // Don't send quic-config to the client if the client didn't include the
+ // config in the session-initiate message.
+ DCHECK(session_accept_quic_config_message_.empty());
+ return session_accept_quic_config_message_;
+ }
+
+ net::CryptoHandshakeMessage handshake_message;
+ handshake_message.set_tag(net::kSHLO);
+ quic_config_.ToHandshakeMessage(&handshake_message);
+
+ session_accept_quic_config_message_ =
+ handshake_message.GetSerialized().AsStringPiece().as_string();
+ return session_accept_quic_config_message_;
+}
+
+// StreamChannelFactory interface.
+void QuicChannelFactory::Core::CreateChannel(
+ const std::string& name,
+ const ChannelCreatedCallback& callback) {
+ if (quic_session_ && quic_session_->connection()->connected()) {
+ if (!is_server_) {
+ net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
+ scoped_ptr<QuicChannel> channel(new QuicClientChannel(
+ stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
+ stream->id()),
+ name));
+ callback.Run(channel.Pass());
+ } else {
+ // On the server side wait for the client to create a QUIC stream and
+ // send the name. The channel will be connected in OnNameReceived().
+ pending_channels_.push_back(new PendingChannel(name, callback));
+ }
+ } else if (!base_channel_factory_) {
+ // Fail synchronously if we failed to connect transport.
+ callback.Run(nullptr);
+ } else {
+ // Still waiting for the transport.
+ pending_channels_.push_back(new PendingChannel(name, callback));
+ }
+}
+
+void QuicChannelFactory::Core::CancelChannelCreation(const std::string& name) {
+ for (auto it = pending_channels_.begin(); it != pending_channels_.end();
+ ++it) {
+ if ((*it)->name == name) {
+ delete *it;
+ pending_channels_.erase(it);
+ return;
+ }
+ }
+}
+
+void QuicChannelFactory::Core::OnBaseChannelReady(
+ scoped_ptr<P2PDatagramSocket> socket) {
+ base_channel_factory_ = nullptr;
+
+ // Failed to connect underlying transport connection. Fail all pending
+ // channel.
+ if (!socket) {
+ while (!pending_channels_.empty()) {
+ scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
+ pending_channels_.erase(pending_channels_.begin());
+ pending_channel->callback.Run(nullptr);
+ }
+ return;
+ }
+
+ QuicPacketWriterFactory writer_factory(socket.get());
+ net::IPAddressNumber ip(net::kIPv4AddressSize, 0);
+ scoped_ptr<net::QuicConnection> quic_connection(new net::QuicConnection(
+ 0, net::IPEndPoint(ip, 0), &quic_helper_, writer_factory,
+ true /* owns_writer */,
+ is_server_ ? net::Perspective::IS_SERVER : net::Perspective::IS_CLIENT,
+ true /* is_secure */, net::QuicSupportedVersions()));
+
+ net::QuicP2PCryptoConfig quic_crypto_config(shared_secret_);
+ quic_crypto_config.set_hkdf_input_suffix(
+ session_id_ + "\0" + kQuicChannelName +
+ session_initiate_quic_config_message_ +
+ session_accept_quic_config_message_);
+
+ quic_session_.reset(new net::QuicP2PSession(
+ quic_config_, quic_crypto_config, quic_connection.Pass(),
+ make_scoped_ptr(new P2PDatagramSocketAdapter(socket.Pass()))));
+ quic_session_->SetDelegate(this);
+ quic_session_->Initialize();
+
+ if (!is_server_) {
+ // On the client create streams for all pending channels and send a name for
+ // each channel.
+ while (!pending_channels_.empty()) {
+ scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
+ pending_channels_.erase(pending_channels_.begin());
+
+ net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
+ scoped_ptr<QuicChannel> channel(new QuicClientChannel(
+ stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
+ stream->id()),
+ pending_channel->name));
+ pending_channel->callback.Run(channel.Pass());
+ }
+ }
+}
+
+void QuicChannelFactory::Core::OnIncomingStream(net::QuicP2PStream* stream) {
+ QuicServerChannel* channel = new QuicServerChannel(
+ stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
+ stream->id()));
+ unnamed_incoming_channels_.push_back(channel);
+ channel->ReceiveName(
+ base::Bind(&Core::OnNameReceived, base::Unretained(this), channel));
+}
+
+void QuicChannelFactory::Core::OnConnectionClosed(net::QuicErrorCode error) {
+ if (error != net::QUIC_NO_ERROR)
+ LOG(ERROR) << "QUIC connection was closed, error_code=" << error;
+
+ while (!pending_channels_.empty()) {
+ scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
+ pending_channels_.erase(pending_channels_.begin());
+ pending_channel->callback.Run(nullptr);
+ }
+}
+
+void QuicChannelFactory::Core::OnNameReceived(QuicChannel* channel) {
+ DCHECK(is_server_);
+
+ scoped_ptr<QuicChannel> owned_channel(channel);
+
+ auto it = std::find(unnamed_incoming_channels_.begin(),
+ unnamed_incoming_channels_.end(), channel);
+ DCHECK(it != unnamed_incoming_channels_.end());
+ unnamed_incoming_channels_.erase(it);
+
+ if (channel->name().empty()) {
+ // Failed to read a name for incoming channel.
+ return;
+ }
+
+ for (auto it = pending_channels_.begin();
+ it != pending_channels_.end(); ++it) {
+ if ((*it)->name == channel->name()) {
+ scoped_ptr<PendingChannel> pending_channel(*it);
+ pending_channels_.erase(it);
+ pending_channel->callback.Run(owned_channel.Pass());
+ return;
+ }
+ }
+
+ LOG(ERROR) << "Unexpected incoming channel: " << channel->name();
+}
+
+void QuicChannelFactory::Core::OnChannelDestroyed(int stream_id) {
+ if (quic_session_)
+ quic_session_->CloseStream(stream_id);
+}
+
+QuicChannelFactory::QuicChannelFactory(const std::string& session_id,
+ bool is_server)
+ : core_(new Core(session_id, is_server)) {}
+
+QuicChannelFactory::~QuicChannelFactory() {
+ core_->Close();
+ base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE, core_.release());
+}
+
+const std::string& QuicChannelFactory::CreateSessionInitiateConfigMessage() {
+ return core_->CreateSessionInitiateConfigMessage();
+}
+
+bool QuicChannelFactory::ProcessSessionAcceptConfigMessage(
+ const std::string& message) {
+ return core_->ProcessSessionAcceptConfigMessage(message);
+}
+
+bool QuicChannelFactory::ProcessSessionInitiateConfigMessage(
+ const std::string& message) {
+ return core_->ProcessSessionInitiateConfigMessage(message);
+}
+
+const std::string& QuicChannelFactory::CreateSessionAcceptConfigMessage() {
+ return core_->CreateSessionAcceptConfigMessage();
+}
+
+void QuicChannelFactory::Start(DatagramChannelFactory* factory,
+ const std::string& shared_secret) {
+ core_->Start(factory, shared_secret);
+}
+
+void QuicChannelFactory::CreateChannel(const std::string& name,
+ const ChannelCreatedCallback& callback) {
+ core_->CreateChannel(name, callback);
+}
+
+void QuicChannelFactory::CancelChannelCreation(const std::string& name) {
+ core_->CancelChannelCreation(name);
+}
+
+net::QuicP2PSession* QuicChannelFactory::GetP2PSessionForTests() {
+ return core_->quic_session_.get();
+}
+
+} // namespace protocol
+} // namespace remoting
« no previous file with comments | « remoting/protocol/quic_channel_factory.h ('k') | remoting/protocol/quic_channel_factory_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698