Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Side by Side Diff: remoting/protocol/pseudotcp_adapter.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/pseudotcp_adapter.h" 5 #include "remoting/protocol/pseudotcp_adapter.h"
6 6
7 #include "base/compiler_specific.h" 7 #include "base/compiler_specific.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/time/time.h" 9 #include "base/time/time.h"
10 #include "base/timer/timer.h" 10 #include "base/timer/timer.h"
11 #include "net/base/address_list.h" 11 #include "net/base/address_list.h"
12 #include "net/base/completion_callback.h" 12 #include "net/base/completion_callback.h"
13 #include "net/base/io_buffer.h" 13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h" 14 #include "net/base/net_errors.h"
15 #include "net/base/net_util.h" 15 #include "net/base/net_util.h"
Wez 2015/06/22 15:49:07 You need the p2p_socket.h include here, I think.
Sergey Ulanov 2015/07/10 00:49:54 it's included in pseudotcp_adapter.h
16 16
17 using cricket::PseudoTcp; 17 using cricket::PseudoTcp;
18 18
19 namespace { 19 namespace {
20 const int kReadBufferSize = 65536; // Maximum size of a packet. 20 const int kReadBufferSize = 65536; // Maximum size of a packet.
21 const uint16 kDefaultMtu = 1280; 21 const uint16 kDefaultMtu = 1280;
22 } // namespace 22 } // namespace
23 23
24 namespace remoting { 24 namespace remoting {
25 namespace protocol { 25 namespace protocol {
26 26
27 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify, 27 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
28 public base::RefCounted<Core> { 28 public base::RefCounted<Core> {
29 public: 29 public:
30 explicit Core(scoped_ptr<net::Socket> socket); 30 explicit Core(scoped_ptr<P2PDatagramSocket> socket);
31 31
32 // Functions used to implement net::StreamSocket. 32 // Functions used to implement net::StreamSocket.
33 int Read(net::IOBuffer* buffer, int buffer_size, 33 int Read(net::IOBuffer* buffer, int buffer_size,
34 const net::CompletionCallback& callback); 34 const net::CompletionCallback& callback);
35 int Write(net::IOBuffer* buffer, int buffer_size, 35 int Write(net::IOBuffer* buffer, int buffer_size,
36 const net::CompletionCallback& callback); 36 const net::CompletionCallback& callback);
37 int Connect(const net::CompletionCallback& callback); 37 int Connect(const net::CompletionCallback& callback);
38 void Disconnect();
39 bool IsConnected() const;
40 38
41 // cricket::IPseudoTcpNotify interface. 39 // cricket::IPseudoTcpNotify interface.
42 // These notifications are triggered from NotifyPacket. 40 // These notifications are triggered from NotifyPacket.
43 void OnTcpOpen(cricket::PseudoTcp* tcp) override; 41 void OnTcpOpen(cricket::PseudoTcp* tcp) override;
44 void OnTcpReadable(cricket::PseudoTcp* tcp) override; 42 void OnTcpReadable(cricket::PseudoTcp* tcp) override;
45 void OnTcpWriteable(cricket::PseudoTcp* tcp) override; 43 void OnTcpWriteable(cricket::PseudoTcp* tcp) override;
46 // This is triggered by NotifyClock or NotifyPacket. 44 // This is triggered by NotifyClock or NotifyPacket.
47 void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) override; 45 void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) override;
48 // This is triggered by NotifyClock, NotifyPacket, Recv and Send. 46 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
49 WriteResult TcpWritePacket(cricket::PseudoTcp* tcp, 47 WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
(...skipping 28 matching lines...) Expand all
78 void CheckWriteComplete(); 76 void CheckWriteComplete();
79 77
80 // This re-sets |timer| without triggering callbacks. 78 // This re-sets |timer| without triggering callbacks.
81 void AdjustClock(); 79 void AdjustClock();
82 80
83 net::CompletionCallback connect_callback_; 81 net::CompletionCallback connect_callback_;
84 net::CompletionCallback read_callback_; 82 net::CompletionCallback read_callback_;
85 net::CompletionCallback write_callback_; 83 net::CompletionCallback write_callback_;
86 84
87 cricket::PseudoTcp pseudo_tcp_; 85 cricket::PseudoTcp pseudo_tcp_;
88 scoped_ptr<net::Socket> socket_; 86 scoped_ptr<P2PDatagramSocket> socket_;
89 87
90 scoped_refptr<net::IOBuffer> read_buffer_; 88 scoped_refptr<net::IOBuffer> read_buffer_;
91 int read_buffer_size_; 89 int read_buffer_size_;
92 scoped_refptr<net::IOBuffer> write_buffer_; 90 scoped_refptr<net::IOBuffer> write_buffer_;
93 int write_buffer_size_; 91 int write_buffer_size_;
94 92
95 // Whether we need to wait for data to be sent before completing write. 93 // Whether we need to wait for data to be sent before completing write.
96 bool write_waits_for_send_; 94 bool write_waits_for_send_;
97 95
98 // Set to true in the write-waits-for-send mode when we've 96 // Set to true in the write-waits-for-send mode when we've
99 // successfully writtend data to the send buffer and waiting for the 97 // successfully writtend data to the send buffer and waiting for the
100 // data to be sent to the remote end. 98 // data to be sent to the remote end.
101 bool waiting_write_position_; 99 bool waiting_write_position_;
102 100
103 // Number of the bytes written by the last write stored while we wait 101 // Number of the bytes written by the last write stored while we wait
104 // for the data to be sent (i.e. when waiting_write_position_ = true). 102 // for the data to be sent (i.e. when waiting_write_position_ = true).
105 int last_write_result_; 103 int last_write_result_;
106 104
107 bool socket_write_pending_; 105 bool socket_write_pending_;
108 scoped_refptr<net::IOBuffer> socket_read_buffer_; 106 scoped_refptr<net::IOBuffer> socket_read_buffer_;
109 107
110 base::OneShotTimer<Core> timer_; 108 base::OneShotTimer<Core> timer_;
111 109
112 DISALLOW_COPY_AND_ASSIGN(Core); 110 DISALLOW_COPY_AND_ASSIGN(Core);
113 }; 111 };
114 112
115 113
116 PseudoTcpAdapter::Core::Core(scoped_ptr<net::Socket> socket) 114 PseudoTcpAdapter::Core::Core(scoped_ptr<P2PDatagramSocket> socket)
117 : pseudo_tcp_(this, 0), 115 : pseudo_tcp_(this, 0),
118 socket_(socket.Pass()), 116 socket_(socket.Pass()),
119 write_waits_for_send_(false), 117 write_waits_for_send_(false),
120 waiting_write_position_(false), 118 waiting_write_position_(false),
121 socket_write_pending_(false) { 119 socket_write_pending_(false) {
122 // Doesn't trigger callbacks. 120 // Doesn't trigger callbacks.
123 pseudo_tcp_.NotifyMTU(kDefaultMtu); 121 pseudo_tcp_.NotifyMTU(kDefaultMtu);
124 } 122 }
125 123
126 PseudoTcpAdapter::Core::~Core() { 124 PseudoTcpAdapter::Core::~Core() {
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
202 return net::ERR_FAILED; 200 return net::ERR_FAILED;
203 201
204 AdjustClock(); 202 AdjustClock();
205 203
206 connect_callback_ = callback; 204 connect_callback_ = callback;
207 DoReadFromSocket(); 205 DoReadFromSocket();
208 206
209 return net::ERR_IO_PENDING; 207 return net::ERR_IO_PENDING;
210 } 208 }
211 209
212 void PseudoTcpAdapter::Core::Disconnect() {
213 // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
214 read_callback_.Reset();
215 read_buffer_ = NULL;
216 write_callback_.Reset();
217 write_buffer_ = NULL;
218 connect_callback_.Reset();
219
220 // TODO(wez): Connect should succeed if called after Disconnect, which
221 // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
222 // and create a new one in Connect.
223 // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
224 // effect. This should be addressed in PseudoTcp, really.
225 // In the meantime we can fake OnTcpClosed notification and tear down the
226 // PseudoTcp.
227 pseudo_tcp_.Close(true);
228 }
229
230 bool PseudoTcpAdapter::Core::IsConnected() const {
231 return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
232 }
233
234 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) { 210 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
235 DCHECK(tcp == &pseudo_tcp_); 211 DCHECK(tcp == &pseudo_tcp_);
236 212
237 if (!connect_callback_.is_null()) { 213 if (!connect_callback_.is_null()) {
238 net::CompletionCallback callback = connect_callback_; 214 net::CompletionCallback callback = connect_callback_;
239 connect_callback_.Reset(); 215 connect_callback_.Reset();
240 callback.Run(net::OK); 216 callback.Run(net::OK);
241 } 217 }
242 218
243 OnTcpReadable(tcp); 219 OnTcpReadable(tcp);
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
334 310
335 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) { 311 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
336 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size); 312 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
337 } 313 }
338 314
339 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) { 315 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
340 write_waits_for_send_ = write_waits_for_send; 316 write_waits_for_send_ = write_waits_for_send;
341 } 317 }
342 318
343 void PseudoTcpAdapter::Core::DeleteSocket() { 319 void PseudoTcpAdapter::Core::DeleteSocket() {
320 // Don't dispatch outstanding callbacks when the socket is deleted.
321 read_callback_.Reset();
322 read_buffer_ = NULL;
323 write_callback_.Reset();
324 write_buffer_ = NULL;
325 connect_callback_.Reset();
326
344 socket_.reset(); 327 socket_.reset();
345 } 328 }
346 329
347 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket( 330 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
348 PseudoTcp* tcp, 331 PseudoTcp* tcp,
349 const char* buffer, 332 const char* buffer,
350 size_t len) { 333 size_t len) {
351 DCHECK_EQ(tcp, &pseudo_tcp_); 334 DCHECK_EQ(tcp, &pseudo_tcp_);
352 335
353 // If we already have a write pending, we behave like a congested network, 336 // If we already have a write pending, we behave like a congested network,
354 // returning success for the write, but dropping the packet. PseudoTcp will 337 // returning success for the write, but dropping the packet. PseudoTcp will
355 // back-off and retransmit, adjusting for the perceived congestion. 338 // back-off and retransmit, adjusting for the perceived congestion.
356 if (socket_write_pending_) 339 if (socket_write_pending_)
357 return IPseudoTcpNotify::WR_SUCCESS; 340 return IPseudoTcpNotify::WR_SUCCESS;
358 341
359 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len); 342 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
360 memcpy(write_buffer->data(), buffer, len); 343 memcpy(write_buffer->data(), buffer, len);
361 344
362 // Our underlying socket is datagram-oriented, which means it should either 345 // Our underlying socket is datagram-oriented, which means it should either
363 // send exactly as many bytes as we requested, or fail. 346 // send exactly as many bytes as we requested, or fail.
364 int result; 347 int result;
365 if (socket_.get()) { 348 if (socket_) {
366 result = socket_->Write( 349 result = socket_->Write(
367 write_buffer.get(), len, 350 write_buffer.get(), len,
368 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this))); 351 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
369 } else { 352 } else {
370 result = net::ERR_CONNECTION_CLOSED; 353 result = net::ERR_CONNECTION_CLOSED;
371 } 354 }
372 if (result == net::ERR_IO_PENDING) { 355 if (result == net::ERR_IO_PENDING) {
373 socket_write_pending_ = true; 356 socket_write_pending_ = true;
374 return IPseudoTcpNotify::WR_SUCCESS; 357 return IPseudoTcpNotify::WR_SUCCESS;
375 } else if (result == net::ERR_MSG_TOO_BIG) { 358 } else if (result == net::ERR_MSG_TOO_BIG) {
376 return IPseudoTcpNotify::WR_TOO_LARGE; 359 return IPseudoTcpNotify::WR_TOO_LARGE;
377 } else if (result < 0) { 360 } else if (result < 0) {
378 return IPseudoTcpNotify::WR_FAIL; 361 return IPseudoTcpNotify::WR_FAIL;
379 } else { 362 } else {
380 return IPseudoTcpNotify::WR_SUCCESS; 363 return IPseudoTcpNotify::WR_SUCCESS;
381 } 364 }
382 } 365 }
383 366
384 void PseudoTcpAdapter::Core::DoReadFromSocket() { 367 void PseudoTcpAdapter::Core::DoReadFromSocket() {
385 if (!socket_read_buffer_.get()) 368 if (!socket_read_buffer_.get())
386 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize); 369 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
387 370
388 int result = 1; 371 int result = 1;
389 while (socket_.get() && result > 0) { 372 while (socket_ && result > 0) {
390 result = socket_->Read( 373 result = socket_->Read(
391 socket_read_buffer_.get(), 374 socket_read_buffer_.get(), kReadBufferSize,
392 kReadBufferSize,
393 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this))); 375 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
394 if (result != net::ERR_IO_PENDING) 376 if (result != net::ERR_IO_PENDING)
395 HandleReadResults(result); 377 HandleReadResults(result);
396 } 378 }
397 } 379 }
398 380
399 void PseudoTcpAdapter::Core::HandleReadResults(int result) { 381 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
400 if (result <= 0) { 382 if (result <= 0) {
401 LOG(ERROR) << "Read returned " << result; 383 LOG(ERROR) << "Read returned " << result;
402 return; 384 return;
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
454 waiting_write_position_ = false; 436 waiting_write_position_ = false;
455 437
456 net::CompletionCallback callback = write_callback_; 438 net::CompletionCallback callback = write_callback_;
457 write_callback_.Reset(); 439 write_callback_.Reset();
458 write_buffer_ = NULL; 440 write_buffer_ = NULL;
459 callback.Run(last_write_result_); 441 callback.Run(last_write_result_);
460 } 442 }
461 } 443 }
462 } 444 }
463 445
464 // Public interface implemention. 446 // Public interface implementation.
465 447
466 PseudoTcpAdapter::PseudoTcpAdapter(scoped_ptr<net::Socket> socket) 448 PseudoTcpAdapter::PseudoTcpAdapter(scoped_ptr<P2PDatagramSocket> socket)
467 : core_(new Core(socket.Pass())) { 449 : core_(new Core(socket.Pass())) {
468 } 450 }
469 451
470 PseudoTcpAdapter::~PseudoTcpAdapter() { 452 PseudoTcpAdapter::~PseudoTcpAdapter() {
471 Disconnect();
472
473 // Make sure that the underlying socket is destroyed before PseudoTcp. 453 // Make sure that the underlying socket is destroyed before PseudoTcp.
474 core_->DeleteSocket(); 454 core_->DeleteSocket();
475 } 455 }
476 456
477 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size, 457 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
478 const net::CompletionCallback& callback) { 458 const net::CompletionCallback& callback) {
479 DCHECK(CalledOnValidThread()); 459 DCHECK(CalledOnValidThread());
480 return core_->Read(buffer, buffer_size, callback); 460 return core_->Read(buffer, buffer_size, callback);
481 } 461 }
482 462
483 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size, 463 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
484 const net::CompletionCallback& callback) { 464 const net::CompletionCallback& callback) {
485 DCHECK(CalledOnValidThread()); 465 DCHECK(CalledOnValidThread());
486 return core_->Write(buffer, buffer_size, callback); 466 return core_->Write(buffer, buffer_size, callback);
487 } 467 }
488 468
489 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) { 469 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
490 DCHECK(CalledOnValidThread()); 470 DCHECK(CalledOnValidThread());
491
492 core_->SetReceiveBufferSize(size); 471 core_->SetReceiveBufferSize(size);
493 return net::OK; 472 return net::OK;
494 } 473 }
495 474
496 int PseudoTcpAdapter::SetSendBufferSize(int32 size) { 475 int PseudoTcpAdapter::SetSendBufferSize(int32 size) {
497 DCHECK(CalledOnValidThread()); 476 DCHECK(CalledOnValidThread());
498
499 core_->SetSendBufferSize(size); 477 core_->SetSendBufferSize(size);
500 return net::OK; 478 return net::OK;
501 } 479 }
502 480
503 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) { 481 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
504 DCHECK(CalledOnValidThread()); 482 DCHECK(CalledOnValidThread());
505
506 // net::StreamSocket requires that Connect return OK if already connected.
507 if (IsConnected())
508 return net::OK;
509
510 return core_->Connect(callback); 483 return core_->Connect(callback);
511 } 484 }
512 485
513 void PseudoTcpAdapter::Disconnect() {
514 DCHECK(CalledOnValidThread());
515 core_->Disconnect();
516 }
517
518 bool PseudoTcpAdapter::IsConnected() const {
519 return core_->IsConnected();
520 }
521
522 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
523 DCHECK(CalledOnValidThread());
524 NOTIMPLEMENTED();
525 return false;
526 }
527
528 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
529 DCHECK(CalledOnValidThread());
530
531 // We don't have a meaningful peer address, but we can't return an
532 // error, so we return a INADDR_ANY instead.
533 net::IPAddressNumber ip_address(net::kIPv4AddressSize);
534 *address = net::IPEndPoint(ip_address, 0);
535 return net::OK;
536 }
537
538 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
539 DCHECK(CalledOnValidThread());
540 NOTIMPLEMENTED();
541 return net::ERR_FAILED;
542 }
543
544 const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
545 DCHECK(CalledOnValidThread());
546 return net_log_;
547 }
548
549 void PseudoTcpAdapter::SetSubresourceSpeculation() {
550 DCHECK(CalledOnValidThread());
551 NOTIMPLEMENTED();
552 }
553
554 void PseudoTcpAdapter::SetOmniboxSpeculation() {
555 DCHECK(CalledOnValidThread());
556 NOTIMPLEMENTED();
557 }
558
559 bool PseudoTcpAdapter::WasEverUsed() const {
560 DCHECK(CalledOnValidThread());
561 NOTIMPLEMENTED();
562 return true;
563 }
564
565 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
566 DCHECK(CalledOnValidThread());
567 return false;
568 }
569
570 bool PseudoTcpAdapter::WasNpnNegotiated() const {
571 DCHECK(CalledOnValidThread());
572 return false;
573 }
574
575 net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
576 DCHECK(CalledOnValidThread());
577 return net::kProtoUnknown;
578 }
579
580 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
581 DCHECK(CalledOnValidThread());
582 return false;
583 }
584
585 void PseudoTcpAdapter::GetConnectionAttempts(
586 net::ConnectionAttempts* out) const {
587 DCHECK(CalledOnValidThread());
588 out->clear();
589 }
590
591 void PseudoTcpAdapter::SetAckDelay(int delay_ms) { 486 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
592 DCHECK(CalledOnValidThread()); 487 DCHECK(CalledOnValidThread());
593 core_->SetAckDelay(delay_ms); 488 core_->SetAckDelay(delay_ms);
594 } 489 }
595 490
596 void PseudoTcpAdapter::SetNoDelay(bool no_delay) { 491 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
597 DCHECK(CalledOnValidThread()); 492 DCHECK(CalledOnValidThread());
598 core_->SetNoDelay(no_delay); 493 core_->SetNoDelay(no_delay);
599 } 494 }
600 495
601 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) { 496 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
602 DCHECK(CalledOnValidThread()); 497 DCHECK(CalledOnValidThread());
603 core_->SetWriteWaitsForSend(write_waits_for_send); 498 core_->SetWriteWaitsForSend(write_waits_for_send);
604 } 499 }
605 500
606 } // namespace protocol 501 } // namespace protocol
607 } // namespace remoting 502 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698