Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(330)

Side by Side Diff: remoting/protocol/pseudotcp_adapter.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/pseudotcp_adapter.h" 5 #include "remoting/protocol/pseudotcp_adapter.h"
6 6
7 #include "base/compiler_specific.h" 7 #include "base/compiler_specific.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/time/time.h" 9 #include "base/time/time.h"
10 #include "base/timer/timer.h" 10 #include "base/timer/timer.h"
11 #include "net/base/address_list.h" 11 #include "net/base/address_list.h"
12 #include "net/base/completion_callback.h" 12 #include "net/base/completion_callback.h"
13 #include "net/base/io_buffer.h" 13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h" 14 #include "net/base/net_errors.h"
15 #include "net/base/net_util.h" 15 #include "net/base/net_util.h"
16 #include "remoting/protocol/p2p_datagram_socket.h"
16 17
17 using cricket::PseudoTcp; 18 using cricket::PseudoTcp;
18 19
19 namespace { 20 namespace {
20 const int kReadBufferSize = 65536; // Maximum size of a packet. 21 const int kReadBufferSize = 65536; // Maximum size of a packet.
21 const uint16 kDefaultMtu = 1280; 22 const uint16 kDefaultMtu = 1280;
22 } // namespace 23 } // namespace
23 24
24 namespace remoting { 25 namespace remoting {
25 namespace protocol { 26 namespace protocol {
26 27
27 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify, 28 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
28 public base::RefCounted<Core> { 29 public base::RefCounted<Core> {
29 public: 30 public:
30 explicit Core(scoped_ptr<net::Socket> socket); 31 explicit Core(scoped_ptr<P2PDatagramSocket> socket);
31 32
32 // Functions used to implement net::StreamSocket. 33 // Functions used to implement net::StreamSocket.
33 int Read(net::IOBuffer* buffer, int buffer_size, 34 int Read(net::IOBuffer* buffer, int buffer_size,
34 const net::CompletionCallback& callback); 35 const net::CompletionCallback& callback);
35 int Write(net::IOBuffer* buffer, int buffer_size, 36 int Write(net::IOBuffer* buffer, int buffer_size,
36 const net::CompletionCallback& callback); 37 const net::CompletionCallback& callback);
37 int Connect(const net::CompletionCallback& callback); 38 int Connect(const net::CompletionCallback& callback);
38 void Disconnect();
39 bool IsConnected() const;
40 39
41 // cricket::IPseudoTcpNotify interface. 40 // cricket::IPseudoTcpNotify interface.
42 // These notifications are triggered from NotifyPacket. 41 // These notifications are triggered from NotifyPacket.
43 void OnTcpOpen(cricket::PseudoTcp* tcp) override; 42 void OnTcpOpen(cricket::PseudoTcp* tcp) override;
44 void OnTcpReadable(cricket::PseudoTcp* tcp) override; 43 void OnTcpReadable(cricket::PseudoTcp* tcp) override;
45 void OnTcpWriteable(cricket::PseudoTcp* tcp) override; 44 void OnTcpWriteable(cricket::PseudoTcp* tcp) override;
46 // This is triggered by NotifyClock or NotifyPacket. 45 // This is triggered by NotifyClock or NotifyPacket.
47 void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) override; 46 void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) override;
48 // This is triggered by NotifyClock, NotifyPacket, Recv and Send. 47 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
49 WriteResult TcpWritePacket(cricket::PseudoTcp* tcp, 48 WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
(...skipping 28 matching lines...) Expand all
78 void CheckWriteComplete(); 77 void CheckWriteComplete();
79 78
80 // This re-sets |timer| without triggering callbacks. 79 // This re-sets |timer| without triggering callbacks.
81 void AdjustClock(); 80 void AdjustClock();
82 81
83 net::CompletionCallback connect_callback_; 82 net::CompletionCallback connect_callback_;
84 net::CompletionCallback read_callback_; 83 net::CompletionCallback read_callback_;
85 net::CompletionCallback write_callback_; 84 net::CompletionCallback write_callback_;
86 85
87 cricket::PseudoTcp pseudo_tcp_; 86 cricket::PseudoTcp pseudo_tcp_;
88 scoped_ptr<net::Socket> socket_; 87 scoped_ptr<P2PDatagramSocket> socket_;
89 88
90 scoped_refptr<net::IOBuffer> read_buffer_; 89 scoped_refptr<net::IOBuffer> read_buffer_;
91 int read_buffer_size_; 90 int read_buffer_size_;
92 scoped_refptr<net::IOBuffer> write_buffer_; 91 scoped_refptr<net::IOBuffer> write_buffer_;
93 int write_buffer_size_; 92 int write_buffer_size_;
94 93
95 // Whether we need to wait for data to be sent before completing write. 94 // Whether we need to wait for data to be sent before completing write.
96 bool write_waits_for_send_; 95 bool write_waits_for_send_;
97 96
98 // Set to true in the write-waits-for-send mode when we've 97 // Set to true in the write-waits-for-send mode when we've
99 // successfully writtend data to the send buffer and waiting for the 98 // successfully writtend data to the send buffer and waiting for the
100 // data to be sent to the remote end. 99 // data to be sent to the remote end.
101 bool waiting_write_position_; 100 bool waiting_write_position_;
102 101
103 // Number of the bytes written by the last write stored while we wait 102 // Number of the bytes written by the last write stored while we wait
104 // for the data to be sent (i.e. when waiting_write_position_ = true). 103 // for the data to be sent (i.e. when waiting_write_position_ = true).
105 int last_write_result_; 104 int last_write_result_;
106 105
107 bool socket_write_pending_; 106 bool socket_write_pending_;
108 scoped_refptr<net::IOBuffer> socket_read_buffer_; 107 scoped_refptr<net::IOBuffer> socket_read_buffer_;
109 108
110 base::OneShotTimer<Core> timer_; 109 base::OneShotTimer<Core> timer_;
111 110
112 DISALLOW_COPY_AND_ASSIGN(Core); 111 DISALLOW_COPY_AND_ASSIGN(Core);
113 }; 112 };
114 113
115 114
116 PseudoTcpAdapter::Core::Core(scoped_ptr<net::Socket> socket) 115 PseudoTcpAdapter::Core::Core(scoped_ptr<P2PDatagramSocket> socket)
117 : pseudo_tcp_(this, 0), 116 : pseudo_tcp_(this, 0),
118 socket_(socket.Pass()), 117 socket_(socket.Pass()),
119 write_waits_for_send_(false), 118 write_waits_for_send_(false),
120 waiting_write_position_(false), 119 waiting_write_position_(false),
121 socket_write_pending_(false) { 120 socket_write_pending_(false) {
122 // Doesn't trigger callbacks. 121 // Doesn't trigger callbacks.
123 pseudo_tcp_.NotifyMTU(kDefaultMtu); 122 pseudo_tcp_.NotifyMTU(kDefaultMtu);
124 } 123 }
125 124
126 PseudoTcpAdapter::Core::~Core() { 125 PseudoTcpAdapter::Core::~Core() {
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
202 return net::ERR_FAILED; 201 return net::ERR_FAILED;
203 202
204 AdjustClock(); 203 AdjustClock();
205 204
206 connect_callback_ = callback; 205 connect_callback_ = callback;
207 DoReadFromSocket(); 206 DoReadFromSocket();
208 207
209 return net::ERR_IO_PENDING; 208 return net::ERR_IO_PENDING;
210 } 209 }
211 210
212 void PseudoTcpAdapter::Core::Disconnect() {
213 // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
214 read_callback_.Reset();
215 read_buffer_ = NULL;
216 write_callback_.Reset();
217 write_buffer_ = NULL;
218 connect_callback_.Reset();
219
220 // TODO(wez): Connect should succeed if called after Disconnect, which
221 // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
222 // and create a new one in Connect.
223 // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
224 // effect. This should be addressed in PseudoTcp, really.
225 // In the meantime we can fake OnTcpClosed notification and tear down the
226 // PseudoTcp.
227 pseudo_tcp_.Close(true);
228 }
229
230 bool PseudoTcpAdapter::Core::IsConnected() const {
231 return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
232 }
233
234 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) { 211 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
235 DCHECK(tcp == &pseudo_tcp_); 212 DCHECK(tcp == &pseudo_tcp_);
236 213
237 if (!connect_callback_.is_null()) { 214 if (!connect_callback_.is_null()) {
238 net::CompletionCallback callback = connect_callback_; 215 net::CompletionCallback callback = connect_callback_;
239 connect_callback_.Reset(); 216 connect_callback_.Reset();
240 callback.Run(net::OK); 217 callback.Run(net::OK);
241 } 218 }
242 219
243 OnTcpReadable(tcp); 220 OnTcpReadable(tcp);
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
334 311
335 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) { 312 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
336 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size); 313 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
337 } 314 }
338 315
339 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) { 316 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
340 write_waits_for_send_ = write_waits_for_send; 317 write_waits_for_send_ = write_waits_for_send;
341 } 318 }
342 319
343 void PseudoTcpAdapter::Core::DeleteSocket() { 320 void PseudoTcpAdapter::Core::DeleteSocket() {
321 // Don't dispatch outstanding callbacks when the socket is deleted.
322 read_callback_.Reset();
323 read_buffer_ = NULL;
324 write_callback_.Reset();
325 write_buffer_ = NULL;
326 connect_callback_.Reset();
327
344 socket_.reset(); 328 socket_.reset();
345 } 329 }
346 330
347 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket( 331 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
348 PseudoTcp* tcp, 332 PseudoTcp* tcp,
349 const char* buffer, 333 const char* buffer,
350 size_t len) { 334 size_t len) {
351 DCHECK_EQ(tcp, &pseudo_tcp_); 335 DCHECK_EQ(tcp, &pseudo_tcp_);
352 336
353 // If we already have a write pending, we behave like a congested network, 337 // If we already have a write pending, we behave like a congested network,
354 // returning success for the write, but dropping the packet. PseudoTcp will 338 // returning success for the write, but dropping the packet. PseudoTcp will
355 // back-off and retransmit, adjusting for the perceived congestion. 339 // back-off and retransmit, adjusting for the perceived congestion.
356 if (socket_write_pending_) 340 if (socket_write_pending_)
357 return IPseudoTcpNotify::WR_SUCCESS; 341 return IPseudoTcpNotify::WR_SUCCESS;
358 342
359 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len); 343 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
360 memcpy(write_buffer->data(), buffer, len); 344 memcpy(write_buffer->data(), buffer, len);
361 345
362 // Our underlying socket is datagram-oriented, which means it should either 346 // Our underlying socket is datagram-oriented, which means it should either
363 // send exactly as many bytes as we requested, or fail. 347 // send exactly as many bytes as we requested, or fail.
364 int result; 348 int result;
365 if (socket_.get()) { 349 if (socket_) {
366 result = socket_->Write( 350 result = socket_->Send(
367 write_buffer.get(), len, 351 write_buffer.get(), len,
368 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this))); 352 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
369 } else { 353 } else {
370 result = net::ERR_CONNECTION_CLOSED; 354 result = net::ERR_CONNECTION_CLOSED;
371 } 355 }
372 if (result == net::ERR_IO_PENDING) { 356 if (result == net::ERR_IO_PENDING) {
373 socket_write_pending_ = true; 357 socket_write_pending_ = true;
374 return IPseudoTcpNotify::WR_SUCCESS; 358 return IPseudoTcpNotify::WR_SUCCESS;
375 } else if (result == net::ERR_MSG_TOO_BIG) { 359 } else if (result == net::ERR_MSG_TOO_BIG) {
376 return IPseudoTcpNotify::WR_TOO_LARGE; 360 return IPseudoTcpNotify::WR_TOO_LARGE;
377 } else if (result < 0) { 361 } else if (result < 0) {
378 return IPseudoTcpNotify::WR_FAIL; 362 return IPseudoTcpNotify::WR_FAIL;
379 } else { 363 } else {
380 return IPseudoTcpNotify::WR_SUCCESS; 364 return IPseudoTcpNotify::WR_SUCCESS;
381 } 365 }
382 } 366 }
383 367
384 void PseudoTcpAdapter::Core::DoReadFromSocket() { 368 void PseudoTcpAdapter::Core::DoReadFromSocket() {
385 if (!socket_read_buffer_.get()) 369 if (!socket_read_buffer_.get())
386 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize); 370 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
387 371
388 int result = 1; 372 int result = 1;
389 while (socket_.get() && result > 0) { 373 while (socket_ && result > 0) {
390 result = socket_->Read( 374 result = socket_->Recv(
391 socket_read_buffer_.get(), 375 socket_read_buffer_.get(), kReadBufferSize,
392 kReadBufferSize,
393 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this))); 376 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
394 if (result != net::ERR_IO_PENDING) 377 if (result != net::ERR_IO_PENDING)
395 HandleReadResults(result); 378 HandleReadResults(result);
396 } 379 }
397 } 380 }
398 381
399 void PseudoTcpAdapter::Core::HandleReadResults(int result) { 382 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
400 if (result <= 0) { 383 if (result <= 0) {
401 LOG(ERROR) << "Read returned " << result; 384 LOG(ERROR) << "Read returned " << result;
402 return; 385 return;
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
454 waiting_write_position_ = false; 437 waiting_write_position_ = false;
455 438
456 net::CompletionCallback callback = write_callback_; 439 net::CompletionCallback callback = write_callback_;
457 write_callback_.Reset(); 440 write_callback_.Reset();
458 write_buffer_ = NULL; 441 write_buffer_ = NULL;
459 callback.Run(last_write_result_); 442 callback.Run(last_write_result_);
460 } 443 }
461 } 444 }
462 } 445 }
463 446
464 // Public interface implemention. 447 // Public interface implementation.
465 448
466 PseudoTcpAdapter::PseudoTcpAdapter(scoped_ptr<net::Socket> socket) 449 PseudoTcpAdapter::PseudoTcpAdapter(scoped_ptr<P2PDatagramSocket> socket)
467 : core_(new Core(socket.Pass())) { 450 : core_(new Core(socket.Pass())) {
468 } 451 }
469 452
470 PseudoTcpAdapter::~PseudoTcpAdapter() { 453 PseudoTcpAdapter::~PseudoTcpAdapter() {
471 Disconnect();
472
473 // Make sure that the underlying socket is destroyed before PseudoTcp. 454 // Make sure that the underlying socket is destroyed before PseudoTcp.
474 core_->DeleteSocket(); 455 core_->DeleteSocket();
475 } 456 }
476 457
477 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, 458 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
478 const net::CompletionCallback& callback) { 459 const net::CompletionCallback& callback) {
479 DCHECK(CalledOnValidThread()); 460 DCHECK(CalledOnValidThread());
480 return core_->Read(buffer, buffer_size, callback); 461 return core_->Read(buffer, buffer_size, callback);
481 } 462 }
482 463
483 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size, 464 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
484 const net::CompletionCallback& callback) { 465 const net::CompletionCallback& callback) {
485 DCHECK(CalledOnValidThread()); 466 DCHECK(CalledOnValidThread());
486 return core_->Write(buffer, buffer_size, callback); 467 return core_->Write(buffer, buffer_size, callback);
487 } 468 }
488 469
489 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) { 470 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
490 DCHECK(CalledOnValidThread()); 471 DCHECK(CalledOnValidThread());
491
492 core_->SetReceiveBufferSize(size); 472 core_->SetReceiveBufferSize(size);
493 return net::OK; 473 return net::OK;
494 } 474 }
495 475
496 int PseudoTcpAdapter::SetSendBufferSize(int32 size) { 476 int PseudoTcpAdapter::SetSendBufferSize(int32 size) {
497 DCHECK(CalledOnValidThread()); 477 DCHECK(CalledOnValidThread());
498
499 core_->SetSendBufferSize(size); 478 core_->SetSendBufferSize(size);
500 return net::OK; 479 return net::OK;
501 } 480 }
502 481
503 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) { 482 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
504 DCHECK(CalledOnValidThread()); 483 DCHECK(CalledOnValidThread());
505
506 // net::StreamSocket requires that Connect return OK if already connected.
507 if (IsConnected())
508 return net::OK;
509
510 return core_->Connect(callback); 484 return core_->Connect(callback);
511 } 485 }
512 486
513 void PseudoTcpAdapter::Disconnect() {
514 DCHECK(CalledOnValidThread());
515 core_->Disconnect();
516 }
517
518 bool PseudoTcpAdapter::IsConnected() const {
519 return core_->IsConnected();
520 }
521
522 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
523 DCHECK(CalledOnValidThread());
524 NOTIMPLEMENTED();
525 return false;
526 }
527
528 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
529 DCHECK(CalledOnValidThread());
530
531 // We don't have a meaningful peer address, but we can't return an
532 // error, so we return a INADDR_ANY instead.
533 net::IPAddressNumber ip_address(net::kIPv4AddressSize);
534 *address = net::IPEndPoint(ip_address, 0);
535 return net::OK;
536 }
537
538 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
539 DCHECK(CalledOnValidThread());
540 NOTIMPLEMENTED();
541 return net::ERR_FAILED;
542 }
543
544 const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
545 DCHECK(CalledOnValidThread());
546 return net_log_;
547 }
548
549 void PseudoTcpAdapter::SetSubresourceSpeculation() {
550 DCHECK(CalledOnValidThread());
551 NOTIMPLEMENTED();
552 }
553
554 void PseudoTcpAdapter::SetOmniboxSpeculation() {
555 DCHECK(CalledOnValidThread());
556 NOTIMPLEMENTED();
557 }
558
559 bool PseudoTcpAdapter::WasEverUsed() const {
560 DCHECK(CalledOnValidThread());
561 NOTIMPLEMENTED();
562 return true;
563 }
564
565 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
566 DCHECK(CalledOnValidThread());
567 return false;
568 }
569
570 bool PseudoTcpAdapter::WasNpnNegotiated() const {
571 DCHECK(CalledOnValidThread());
572 return false;
573 }
574
575 net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
576 DCHECK(CalledOnValidThread());
577 return net::kProtoUnknown;
578 }
579
580 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
581 DCHECK(CalledOnValidThread());
582 return false;
583 }
584
585 void PseudoTcpAdapter::GetConnectionAttempts(
586 net::ConnectionAttempts* out) const {
587 DCHECK(CalledOnValidThread());
588 out->clear();
589 }
590
591 void PseudoTcpAdapter::SetAckDelay(int delay_ms) { 487 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
592 DCHECK(CalledOnValidThread()); 488 DCHECK(CalledOnValidThread());
593 core_->SetAckDelay(delay_ms); 489 core_->SetAckDelay(delay_ms);
594 } 490 }
595 491
596 void PseudoTcpAdapter::SetNoDelay(bool no_delay) { 492 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
597 DCHECK(CalledOnValidThread()); 493 DCHECK(CalledOnValidThread());
598 core_->SetNoDelay(no_delay); 494 core_->SetNoDelay(no_delay);
599 } 495 }
600 496
601 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) { 497 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
602 DCHECK(CalledOnValidThread()); 498 DCHECK(CalledOnValidThread());
603 core_->SetWriteWaitsForSend(write_waits_for_send); 499 core_->SetWriteWaitsForSend(write_waits_for_send);
604 } 500 }
605 501
606 } // namespace protocol 502 } // namespace protocol
607 } // namespace remoting 503 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698