Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: remoting/protocol/quic_channel_factory.cc

Issue 1273233002: Implement QuicChannel and QuicChannelFactory (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/protocol/quic_channel_factory.h"
6
7 #include <vector>
8
9 #include "base/bind.h"
10 #include "base/location.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/stl_util.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "net/base/io_buffer.h"
15 #include "net/base/net_errors.h"
16 #include "net/quic/crypto/crypto_framer.h"
17 #include "net/quic/crypto/crypto_handshake_message.h"
18 #include "net/quic/crypto/crypto_protocol.h"
19 #include "net/quic/crypto/quic_random.h"
20 #include "net/quic/p2p/quic_p2p_crypto_config.h"
21 #include "net/quic/p2p/quic_p2p_session.h"
22 #include "net/quic/p2p/quic_p2p_stream.h"
23 #include "net/quic/quic_clock.h"
24 #include "net/quic/quic_connection_helper.h"
25 #include "net/quic/quic_default_packet_writer.h"
26 #include "net/socket/stream_socket.h"
27 #include "remoting/base/constants.h"
28 #include "remoting/protocol/datagram_channel_factory.h"
29 #include "remoting/protocol/p2p_datagram_socket.h"
30 #include "remoting/protocol/quic_channel.h"
31
32 namespace remoting {
33 namespace protocol {
34
35 namespace {
36
37 class NET_EXPORT_PRIVATE P2PQuicPacketWriter : public net::QuicPacketWriter {
dcaiafa 2015/08/10 23:31:36 Why is NET_EXPORT_PRIVATE needed?
Sergey Ulanov 2015/08/11 21:34:00 not needed. removed.
38 public:
39 P2PQuicPacketWriter(net::QuicConnection* connection,
40 P2PDatagramSocket* socket)
41 : connection_(connection), socket_(socket), weak_factory_(this) {}
42 ~P2PQuicPacketWriter() override {}
43
44 // QuicPacketWriter interface.
45 net::WriteResult WritePacket(const char* buffer,
46 size_t buf_len,
47 const net::IPAddressNumber& self_address,
48 const net::IPEndPoint& peer_address) override {
49 DCHECK(!write_blocked_);
50
51 scoped_refptr<net::StringIOBuffer> buf(
52 new net::StringIOBuffer(std::string(buffer, buf_len)));
53 int result = socket_->Send(buf, buf_len,
54 base::Bind(&P2PQuicPacketWriter::OnSendComplete,
55 weak_factory_.GetWeakPtr()));
56 net::WriteStatus status = net::WRITE_STATUS_OK;
57 if (result < 0) {
58 if (result == net::ERR_IO_PENDING) {
59 status = net::WRITE_STATUS_BLOCKED;
60 write_blocked_ = true;
61 } else {
62 status = net::WRITE_STATUS_ERROR;
63 }
64 }
65
66 return net::WriteResult(status, result);
67 }
68 bool IsWriteBlockedDataBuffered() const override {
69 // P2PDatagramSocket::Send() method buffer the data until the Send is
70 // unblocked.
71 return true;
72 }
73 bool IsWriteBlocked() const override { return write_blocked_; }
74 void SetWritable() override { write_blocked_ = false; }
75
76 private:
77 void OnSendComplete(int result){
78 DCHECK_NE(result, net::ERR_IO_PENDING);
79 write_blocked_ = false;
80 if (result < 0) {
81 connection_->OnWriteError(result);
82 }
83 connection_->OnCanWrite();
84 }
85
86 net::QuicConnection* connection_;
87 P2PDatagramSocket* socket_;
88
89 // Whether a write is currently in flight.
90 bool write_blocked_ = false;
91
92 base::WeakPtrFactory<P2PQuicPacketWriter> weak_factory_;
93
94 DISALLOW_COPY_AND_ASSIGN(P2PQuicPacketWriter);
95 };
96
97 class QuicPacketWriterFactory
98 : public net::QuicConnection::PacketWriterFactory {
99 public:
100 explicit QuicPacketWriterFactory(P2PDatagramSocket* socket)
101 : socket_(socket) {}
102 ~QuicPacketWriterFactory() override {}
103
104 net::QuicPacketWriter* Create(
105 net::QuicConnection* connection) const override {
106 return new P2PQuicPacketWriter(connection, socket_);
107 }
108
109 private:
110 P2PDatagramSocket* socket_;
111 };
112
113 class P2PDatagramSocketAdapter : public net::Socket {
114 public:
115 explicit P2PDatagramSocketAdapter(scoped_ptr<P2PDatagramSocket> socket)
116 : socket_(socket.Pass()) {}
117 ~P2PDatagramSocketAdapter() override {}
118
119 int Read(net::IOBuffer* buf, int buf_len,
120 const net::CompletionCallback& callback) override {
121 return socket_->Recv(buf, buf_len, callback);
122 }
123 int Write(net::IOBuffer* buf, int buf_len,
124 const net::CompletionCallback& callback) override {
125 return socket_->Send(buf, buf_len, callback);
126 }
127
128 int SetReceiveBufferSize(int32_t size) override {
129 NOTREACHED();
130 return net::ERR_FAILED;
131 }
132
133 int SetSendBufferSize(int32_t size) override {
134 NOTREACHED();
135 return net::ERR_FAILED;
136 }
137
138 private:
139 scoped_ptr<P2PDatagramSocket> socket_;
140 };
141
142 } // namespace
143
144 class QuicChannelFactory::Core : public net::QuicP2PSession::Delegate {
145 public:
146 Core(const std::string& session_id, bool is_server);
147 virtual ~Core();
148
149 // Called from ~QuicChannelFactory() to synchronously release underlying
150 // socket. Core is destroyed later asynchronously.
151 void Close();
152
153 // Implementation of all all methods for QuicChannelFactory.
154 const std::string& CreateSessionInitiateConfigMessage();
155 bool ProcessSessionAcceptConfigMessage(const std::string& message);
156
157 bool ProcessSessionInitiateConfigMessage(const std::string& message);
158 const std::string& CreateSessionAcceptConfigMessage();
159
160 void Start(DatagramChannelFactory* factory, const std::string& shared_secret);
161
162 void CreateChannel(const std::string& name,
163 const ChannelCreatedCallback& callback);
164 void CancelChannelCreation(const std::string& name);
165
166 private:
167 friend class QuicChannelFactory;
168
169 struct PendingChannel {
170 PendingChannel(const std::string& name,
171 const ChannelCreatedCallback& callback)
172 : name(name), callback(callback) {}
173
174 std::string name;
175 ChannelCreatedCallback callback;
176 };
177
178 // QuicP2PSession::Delegate interface.
179 void OnIncomingStream(net::QuicP2PStream* stream) override;
180 void OnConnectionClosed(net::QuicErrorCode error) override;
181
182 void OnBaseChannelReady(scoped_ptr<P2PDatagramSocket> socket);
183
184 void OnNameReceived(QuicChannel* channel);
185
186 void OnChannelDestroyed(int stream_id);
187
188 std::string session_id_;
189 bool is_server_;
190 DatagramChannelFactory* base_channel_factory_;
dcaiafa 2015/08/10 23:31:36 Not initialized.
Sergey Ulanov 2015/08/11 21:34:00 Done. Good catch.
191
192 net::QuicConfig quic_config_;
193 std::string shared_secret_;
194 std::string session_initiate_quic_config_message_;
195 std::string session_accept_quic_config_message_;
196
197 net::QuicClock quic_clock_;
198 net::QuicConnectionHelper quic_helper_;
199 scoped_ptr<net::QuicP2PSession> quic_session_;
200 bool connected_ = false;
201
202 std::vector<PendingChannel*> pending_channels_;
203 std::vector<QuicChannel*> unnamed_incoming_channels_;
204
205 base::WeakPtrFactory<Core> weak_factory_;
206
207 DISALLOW_COPY_AND_ASSIGN(Core);
208 };
209
210 QuicChannelFactory::Core::Core(const std::string& session_id, bool is_server)
211 : session_id_(session_id),
212 is_server_(is_server),
213 quic_helper_(base::ThreadTaskRunnerHandle::Get().get(),
214 &quic_clock_,
215 net::QuicRandom::GetInstance()),
216 weak_factory_(this) {
217 quic_config_.SetInitialSessionFlowControlWindowToSend(1024*1024);
dcaiafa 2015/08/10 23:31:36 Make constants for these, plz - or one shared cons
Sergey Ulanov 2015/08/11 21:34:00 Done.
218 quic_config_.SetInitialStreamFlowControlWindowToSend(1024*1024);
219 quic_config_.SetSocketReceiveBufferToSend(1024*1024);
220 }
221
222 QuicChannelFactory::Core::~Core() {
223 }
224
225 void QuicChannelFactory::Core::Close() {
226 DCHECK(pending_channels_.empty());
227
228 // Cancel creation of the base channel if it hasn't finished.
229 if (base_channel_factory_)
230 base_channel_factory_->CancelChannelCreation(kQuicChannelName);
231
232 STLDeleteElements(&unnamed_incoming_channels_);
233
234 if (quic_session_ && quic_session_->connection()->connected())
235 quic_session_->connection()->CloseConnection(net::QUIC_NO_ERROR, false);
236
237 DCHECK(unnamed_incoming_channels_.empty());
238 }
239
240 void QuicChannelFactory::Core::Start(DatagramChannelFactory* factory,
241 const std::string& shared_secret) {
242 base_channel_factory_ = factory;
243 shared_secret_ = shared_secret;
244
245 base_channel_factory_->CreateChannel(
246 kQuicChannelName,
247 base::Bind(&Core::OnBaseChannelReady, weak_factory_.GetWeakPtr()));
248 }
249
250 const std::string&
251 QuicChannelFactory::Core::CreateSessionInitiateConfigMessage() {
252 DCHECK(!is_server_);
253
254 net::CryptoHandshakeMessage handshake_message;
255 handshake_message.set_tag(net::kCHLO);
256 quic_config_.ToHandshakeMessage(&handshake_message);
257
258 session_initiate_quic_config_message_ =
259 handshake_message.GetSerialized().AsStringPiece().as_string();
260 return session_initiate_quic_config_message_;
261 }
262
263 bool QuicChannelFactory::Core::ProcessSessionAcceptConfigMessage(
264 const std::string& message) {
265 DCHECK(!is_server_);
266
267 session_accept_quic_config_message_ = message;
268
269 scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
270 net::CryptoFramer::ParseMessage(message));
271 if (!parsed_message) {
272 LOG(ERROR) << "Received invalid QUIC config.";
273 return false;
274 }
275
276 if (parsed_message->tag() != net::kSHLO) {
277 LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
278 << parsed_message->tag();
279 return false;
280 }
281
282 std::string error_message;
283 net::QuicErrorCode error = quic_config_.ProcessPeerHello(
284 *parsed_message, net::SERVER, &error_message);
285 if (error != net::QUIC_NO_ERROR) {
286 LOG(ERROR) << "Failed to process QUIC handshake message: "
287 << error_message;
288 return false;
289 }
290
291 return true;
292 }
293
294 bool QuicChannelFactory::Core::ProcessSessionInitiateConfigMessage(
295 const std::string& message) {
296 DCHECK(is_server_);
297
298 session_initiate_quic_config_message_ = message;
299
300 scoped_ptr<net::CryptoHandshakeMessage> parsed_message(
301 net::CryptoFramer::ParseMessage(message));
302 if (!parsed_message) {
303 LOG(ERROR) << "Received invalid QUIC config.";
304 return false;
305 }
306
307 if (parsed_message->tag() != net::kCHLO) {
308 LOG(ERROR) << "Received QUIC handshake message with unexpected tag "
309 << parsed_message->tag();
310 return false;
311 }
312
313 std::string error_message;
314 net::QuicErrorCode error = quic_config_.ProcessPeerHello(
315 *parsed_message, net::CLIENT, &error_message);
316 if (error != net::QUIC_NO_ERROR) {
317 LOG(ERROR) << "Failed to process QUIC handshake message: "
318 << error_message;
319 return false;
320 }
321
322 return true;
323 }
324
325 const std::string&
326 QuicChannelFactory::Core::CreateSessionAcceptConfigMessage() {
327 DCHECK(is_server_);
328
329 if (session_initiate_quic_config_message_.empty()) {
330 // Don't send quic-config to the client if the client didn't include the
331 // config in the session-initiate message.
332 DCHECK(session_accept_quic_config_message_.empty());
333 return session_accept_quic_config_message_;
334 }
335
336 net::CryptoHandshakeMessage handshake_message;
337 handshake_message.set_tag(net::kSHLO);
338 quic_config_.ToHandshakeMessage(&handshake_message);
339
340 session_accept_quic_config_message_ =
341 handshake_message.GetSerialized().AsStringPiece().as_string();
342 return session_accept_quic_config_message_;
343 }
344
345 // StreamChannelFactory interface.
346 void QuicChannelFactory::Core::CreateChannel(
347 const std::string& name,
348 const ChannelCreatedCallback& callback) {
349 if (quic_session_ && quic_session_->connection()->connected()) {
350 if (!is_server_) {
351 net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
352 scoped_ptr<QuicChannel> channel(new QuicClientChannel(
353 stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
354 stream->id()),
355 name));
356 callback.Run(channel.Pass());
357 } else {
358 // On the server side wait for the client to create a QUIC stream and
359 // send the name. The channel will be connected in OnNameReceived().
360 pending_channels_.push_back(new PendingChannel(name, callback));
361 }
362 } else if (!base_channel_factory_) {
363 // Fail synchronously if we failed to connect transport.
364 callback.Run(nullptr);
365 } else {
366 // Still waiting for the transport.
367 pending_channels_.push_back(new PendingChannel(name, callback));
368 }
369 }
370
371 void QuicChannelFactory::Core::CancelChannelCreation(const std::string& name) {
372 for (auto it = pending_channels_.begin(); it != pending_channels_.end();
373 ++it) {
374 if ((*it)->name == name) {
375 pending_channels_.erase(it);
376 return;
377 }
378 }
379 }
380
381 void QuicChannelFactory::Core::OnBaseChannelReady(
382 scoped_ptr<P2PDatagramSocket> socket) {
383 base_channel_factory_ = nullptr;
384
385 // Failed to connect underlying transport connection. Fail all pending
386 // channel.
387 if (!socket) {
388 while (!pending_channels_.empty()) {
389 scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
390 pending_channels_.erase(pending_channels_.begin());
391 pending_channel->callback.Run(nullptr);
392 }
393 return;
394 }
395
396 QuicPacketWriterFactory writer_factory(socket.get());
397 net::IPAddressNumber ip(net::kIPv4AddressSize, 0);
398 scoped_ptr<net::QuicConnection> quic_connection(new net::QuicConnection(
399 0, net::IPEndPoint(ip, 0), &quic_helper_, writer_factory,
400 true /* owns_writer */,
401 is_server_ ? net::Perspective::IS_SERVER : net::Perspective::IS_CLIENT,
402 true /* is_secuire */, net::QuicSupportedVersions()));
dcaiafa 2015/08/10 23:31:36 typo: is_secure
Sergey Ulanov 2015/08/11 21:34:00 Done.
403
404 net::QuicP2PCryptoConfig quic_crypto_config(shared_secret_);
405 quic_crypto_config.set_hkdf_input_suffix(
406 session_id_ + "\0" + kQuicChannelName +
407 session_initiate_quic_config_message_ +
408 session_accept_quic_config_message_);
409
410 quic_session_.reset(new net::QuicP2PSession(
411 quic_config_, quic_crypto_config, quic_connection.Pass(),
412 make_scoped_ptr(new P2PDatagramSocketAdapter(socket.Pass()))));
413 quic_session_->SetDelegate(this);
414 quic_session_->Initialize();
415
416 if (!is_server_) {
417 // On the client create streams for all pending channels and send a name for
418 // each channel.
419 while (!pending_channels_.empty()) {
420 scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
421 pending_channels_.erase(pending_channels_.begin());
422
423 net::QuicP2PStream* stream = quic_session_->CreateOutgoingDynamicStream();
424 scoped_ptr<QuicChannel> channel(new QuicClientChannel(
425 stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
426 stream->id()),
427 pending_channel->name));
428 pending_channel->callback.Run(channel.Pass());
429 }
430 }
431 }
432
433 void QuicChannelFactory::Core::OnIncomingStream(net::QuicP2PStream* stream) {
434 QuicServerChannel* channel = new QuicServerChannel(
435 stream, base::Bind(&Core::OnChannelDestroyed, base::Unretained(this),
436 stream->id()));
437 unnamed_incoming_channels_.push_back(channel);
438 channel->ReceiveName(
439 base::Bind(&Core::OnNameReceived, base::Unretained(this), channel));
440 }
441
442 void QuicChannelFactory::Core::OnConnectionClosed(net::QuicErrorCode error) {
443 if (error != net::QUIC_NO_ERROR)
444 LOG(ERROR) << "QUIC connection was closed, error_code=" << error;
445
446 while (!pending_channels_.empty()) {
447 scoped_ptr<PendingChannel> pending_channel(pending_channels_.front());
448 pending_channels_.erase(pending_channels_.begin());
449 pending_channel->callback.Run(nullptr);
450 }
451 }
452
453 void QuicChannelFactory::Core::OnNameReceived(QuicChannel* channel) {
454 DCHECK(is_server_);
455
456 scoped_ptr<QuicChannel> owned_channel(channel);
457
458 auto it = std::find(unnamed_incoming_channels_.begin(),
459 unnamed_incoming_channels_.end(), channel);
460 DCHECK(it != unnamed_incoming_channels_.end());
461 unnamed_incoming_channels_.erase(it);
462
463 if (channel->name().empty()) {
464 // Failed to read a name for incoming channel.
465 return;
466 }
467
468 for (auto it = pending_channels_.begin();
469 it != pending_channels_.end(); ++it) {
470 if ((*it)->name == channel->name()) {
471 ChannelCreatedCallback callback = (*it)->callback;
472 pending_channels_.erase(it);
473 callback.Run(owned_channel.Pass());
474 return;
475 }
476 }
477
478 LOG(ERROR) << "Unexpected incoming channel: " << channel->name();
479 }
480
481 void QuicChannelFactory::Core::OnChannelDestroyed(int stream_id) {
482 if (quic_session_)
483 quic_session_->CloseStream(stream_id);
484 }
485
486 QuicChannelFactory::QuicChannelFactory(const std::string& session_id,
487 bool is_server)
488 : core_(new Core(session_id, is_server)) {}
489
490 QuicChannelFactory::~QuicChannelFactory() {
491 core_->Close();
492 base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE, core_.release());
493 }
494
495 const std::string& QuicChannelFactory::CreateSessionInitiateConfigMessage() {
496 return core_->CreateSessionInitiateConfigMessage();
497 }
498
499 bool QuicChannelFactory::ProcessSessionAcceptConfigMessage(
500 const std::string& message) {
501 return core_->ProcessSessionAcceptConfigMessage(message);
502 }
503
504 bool QuicChannelFactory::ProcessSessionInitiateConfigMessage(
505 const std::string& message) {
506 return core_->ProcessSessionInitiateConfigMessage(message);
507 }
508
509 const std::string& QuicChannelFactory::CreateSessionAcceptConfigMessage() {
510 return core_->CreateSessionAcceptConfigMessage();
511 }
512
513 void QuicChannelFactory::Start(DatagramChannelFactory* factory,
514 const std::string& shared_secret) {
515 core_->Start(factory, shared_secret);
516 }
517
518 void QuicChannelFactory::CreateChannel(const std::string& name,
519 const ChannelCreatedCallback& callback) {
520 core_->CreateChannel(name, callback);
521 }
522
523 void QuicChannelFactory::CancelChannelCreation(const std::string& name) {
524 core_->CancelChannelCreation(name);
525 }
526
527 net::QuicP2PSession* QuicChannelFactory::GetP2PSessionForTests() {
528 return core_->quic_session_.get();
529 }
530
531 } // namespace protocol
532 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698