Index: remoting/protocol/libjingle_transport_factory.cc |
diff --git a/remoting/protocol/libjingle_transport_factory.cc b/remoting/protocol/libjingle_transport_factory.cc |
index 61ed3eafeba591bd021adca6412aff14a7b80db1..cc31440968ce51137640336f7ca4788493474ece 100644 |
--- a/remoting/protocol/libjingle_transport_factory.cc |
+++ b/remoting/protocol/libjingle_transport_factory.cc |
@@ -9,8 +9,11 @@ |
#include "base/thread_task_runner_handle.h" |
#include "base/timer/timer.h" |
#include "jingle/glue/channel_socket_adapter.h" |
+#include "jingle/glue/pseudotcp_adapter.h" |
#include "jingle/glue/utils.h" |
#include "net/base/net_errors.h" |
+#include "remoting/base/constants.h" |
+#include "remoting/protocol/channel_authenticator.h" |
#include "remoting/protocol/network_settings.h" |
#include "remoting/signaling/jingle_info_request.h" |
#include "third_party/libjingle/source/talk/p2p/base/constants.h" |
@@ -25,6 +28,15 @@ |
namespace { |
+// Value is chosen to balance the extra latency against the reduced |
+// load due to ACK traffic. |
+const int kTcpAckDelayMilliseconds = 10; |
+ |
+// Values for the TCP send and receive buffer size. This should be tuned to |
+// accommodate high latency network but not backlog the decoding pipeline. |
+const int kTcpReceiveBufferSize = 256 * 1024; |
+const int kTcpSendBufferSize = kTcpReceiveBufferSize + 30 * 1024; |
+ |
// Try connecting ICE twice with timeout of 15 seconds for each attempt. |
const int kMaxReconnectAttempts = 2; |
const int kReconnectDelaySeconds = 15; |
@@ -32,23 +44,25 @@ |
// Get fresh STUN/Relay configuration every hour. |
const int kJingleInfoUpdatePeriodSeconds = 3600; |
-class LibjingleTransport |
- : public Transport, |
- public base::SupportsWeakPtr<LibjingleTransport>, |
+class LibjingleStreamTransport |
+ : public StreamTransport, |
+ public base::SupportsWeakPtr<LibjingleStreamTransport>, |
public sigslot::has_slots<> { |
public: |
- LibjingleTransport(cricket::PortAllocator* port_allocator, |
+ LibjingleStreamTransport(cricket::PortAllocator* port_allocator, |
const NetworkSettings& network_settings); |
- virtual ~LibjingleTransport(); |
+ virtual ~LibjingleStreamTransport(); |
// Called by JingleTransportFactory when it has fresh Jingle info. |
void OnCanStart(); |
- // Transport interface. |
- virtual void Connect( |
+ // StreamTransport interface. |
+ virtual void Initialize( |
const std::string& name, |
Transport::EventHandler* event_handler, |
- const Transport::ConnectedCallback& callback) OVERRIDE; |
+ scoped_ptr<ChannelAuthenticator> authenticator) OVERRIDE; |
+ virtual void Connect( |
+ const StreamTransport::ConnectedCallback& callback) OVERRIDE; |
virtual void AddRemoteCandidate(const cricket::Candidate& candidate) OVERRIDE; |
virtual const std::string& name() const OVERRIDE; |
virtual bool is_connected() const OVERRIDE; |
@@ -64,6 +78,13 @@ |
const cricket::Candidate& candidate); |
void OnWritableState(cricket::TransportChannel* channel); |
+ // Callback for PseudoTcpAdapter::Connect(). |
+ void OnTcpConnected(int result); |
+ |
+ // Callback for Authenticator::SecureAndAuthenticate(); |
+ void OnAuthenticationDone(net::Error error, |
+ scoped_ptr<net::StreamSocket> socket); |
+ |
// Callback for jingle_glue::TransportChannelSocketAdapter to notify when the |
// socket is destroyed. |
void OnChannelDestroyed(); |
@@ -71,12 +92,17 @@ |
// Tries to connect by restarting ICE. Called by |reconnect_timer_|. |
void TryReconnect(); |
+ // Helper methods to call |callback_|. |
+ void NotifyConnected(scoped_ptr<net::StreamSocket> socket); |
+ void NotifyConnectFailed(); |
+ |
cricket::PortAllocator* port_allocator_; |
NetworkSettings network_settings_; |
std::string name_; |
EventHandler* event_handler_; |
- Transport::ConnectedCallback callback_; |
+ StreamTransport::ConnectedCallback callback_; |
+ scoped_ptr<ChannelAuthenticator> authenticator_; |
std::string ice_username_fragment_; |
std::string ice_password_; |
@@ -86,12 +112,15 @@ |
scoped_ptr<cricket::P2PTransportChannel> channel_; |
bool channel_was_writable_; |
int connect_attempts_left_; |
- base::RepeatingTimer<LibjingleTransport> reconnect_timer_; |
- |
- DISALLOW_COPY_AND_ASSIGN(LibjingleTransport); |
+ base::RepeatingTimer<LibjingleStreamTransport> reconnect_timer_; |
+ |
+ // We own |socket_| until it is connected. |
+ scoped_ptr<jingle_glue::PseudoTcpAdapter> socket_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(LibjingleStreamTransport); |
}; |
-LibjingleTransport::LibjingleTransport( |
+LibjingleStreamTransport::LibjingleStreamTransport( |
cricket::PortAllocator* port_allocator, |
const NetworkSettings& network_settings) |
: port_allocator_(port_allocator), |
@@ -107,10 +136,11 @@ |
DCHECK(!ice_password_.empty()); |
} |
-LibjingleTransport::~LibjingleTransport() { |
+LibjingleStreamTransport::~LibjingleStreamTransport() { |
DCHECK(event_handler_); |
- |
event_handler_->OnTransportDeleted(this); |
+ // Channel should be already destroyed if we were connected. |
+ DCHECK(!is_connected() || socket_.get() == NULL); |
if (channel_.get()) { |
base::ThreadTaskRunnerHandle::Get()->DeleteSoon( |
@@ -118,7 +148,7 @@ |
} |
} |
-void LibjingleTransport::OnCanStart() { |
+void LibjingleStreamTransport::OnCanStart() { |
DCHECK(CalledOnValidThread()); |
DCHECK(!can_start_); |
@@ -134,25 +164,33 @@ |
} |
} |
-void LibjingleTransport::Connect( |
+void LibjingleStreamTransport::Initialize( |
const std::string& name, |
Transport::EventHandler* event_handler, |
- const Transport::ConnectedCallback& callback) { |
- DCHECK(CalledOnValidThread()); |
+ scoped_ptr<ChannelAuthenticator> authenticator) { |
+ DCHECK(CalledOnValidThread()); |
+ |
DCHECK(!name.empty()); |
DCHECK(event_handler); |
- DCHECK(!callback.is_null()); |
- |
+ |
+ // Can be initialized only once. |
DCHECK(name_.empty()); |
+ |
name_ = name; |
event_handler_ = event_handler; |
+ authenticator_ = authenticator.Pass(); |
+} |
+ |
+void LibjingleStreamTransport::Connect( |
+ const StreamTransport::ConnectedCallback& callback) { |
+ DCHECK(CalledOnValidThread()); |
callback_ = callback; |
if (can_start_) |
DoStart(); |
} |
-void LibjingleTransport::DoStart() { |
+void LibjingleStreamTransport::DoStart() { |
DCHECK(!channel_.get()); |
// Create P2PTransportChannel, attach signal handlers and connect it. |
@@ -162,13 +200,13 @@ |
channel_->SetIceProtocolType(cricket::ICEPROTO_GOOGLE); |
channel_->SetIceCredentials(ice_username_fragment_, ice_password_); |
channel_->SignalRequestSignaling.connect( |
- this, &LibjingleTransport::OnRequestSignaling); |
+ this, &LibjingleStreamTransport::OnRequestSignaling); |
channel_->SignalCandidateReady.connect( |
- this, &LibjingleTransport::OnCandidateReady); |
+ this, &LibjingleStreamTransport::OnCandidateReady); |
channel_->SignalRouteChange.connect( |
- this, &LibjingleTransport::OnRouteChange); |
+ this, &LibjingleStreamTransport::OnRouteChange); |
channel_->SignalWritableState.connect( |
- this, &LibjingleTransport::OnWritableState); |
+ this, &LibjingleStreamTransport::OnWritableState); |
channel_->set_incoming_only( |
!(network_settings_.flags & NetworkSettings::NAT_TRAVERSAL_OUTGOING)); |
@@ -179,20 +217,37 @@ |
// Start reconnection timer. |
reconnect_timer_.Start( |
FROM_HERE, base::TimeDelta::FromSeconds(kReconnectDelaySeconds), |
- this, &LibjingleTransport::TryReconnect); |
+ this, &LibjingleStreamTransport::TryReconnect); |
// Create net::Socket adapter for the P2PTransportChannel. |
- scoped_ptr<jingle_glue::TransportChannelSocketAdapter> socket( |
+ scoped_ptr<jingle_glue::TransportChannelSocketAdapter> channel_adapter( |
new jingle_glue::TransportChannelSocketAdapter(channel_.get())); |
- socket->SetOnDestroyedCallback(base::Bind( |
- &LibjingleTransport::OnChannelDestroyed, base::Unretained(this))); |
- |
- Transport::ConnectedCallback callback = callback_; |
- callback_.Reset(); |
- callback.Run(socket.PassAs<net::Socket>()); |
-} |
- |
-void LibjingleTransport::AddRemoteCandidate( |
+ |
+ channel_adapter->SetOnDestroyedCallback(base::Bind( |
+ &LibjingleStreamTransport::OnChannelDestroyed, base::Unretained(this))); |
+ |
+ // Configure and connect PseudoTCP adapter. |
+ socket_.reset( |
+ new jingle_glue::PseudoTcpAdapter(channel_adapter.release())); |
+ socket_->SetSendBufferSize(kTcpSendBufferSize); |
+ socket_->SetReceiveBufferSize(kTcpReceiveBufferSize); |
+ socket_->SetNoDelay(true); |
+ socket_->SetAckDelay(kTcpAckDelayMilliseconds); |
+ |
+ // TODO(sergeyu): This is a hack to improve latency of the video |
+ // channel. Consider removing it once we have better flow control |
+ // implemented. |
+ if (name_ == kVideoChannelName) |
+ socket_->SetWriteWaitsForSend(true); |
+ |
+ int result = socket_->Connect( |
+ base::Bind(&LibjingleStreamTransport::OnTcpConnected, |
+ base::Unretained(this))); |
+ if (result != net::ERR_IO_PENDING) |
+ OnTcpConnected(result); |
+} |
+ |
+void LibjingleStreamTransport::AddRemoteCandidate( |
const cricket::Candidate& candidate) { |
DCHECK(CalledOnValidThread()); |
@@ -210,30 +265,30 @@ |
} |
} |
-const std::string& LibjingleTransport::name() const { |
+const std::string& LibjingleStreamTransport::name() const { |
DCHECK(CalledOnValidThread()); |
return name_; |
} |
-bool LibjingleTransport::is_connected() const { |
+bool LibjingleStreamTransport::is_connected() const { |
DCHECK(CalledOnValidThread()); |
return callback_.is_null(); |
} |
-void LibjingleTransport::OnRequestSignaling( |
+void LibjingleStreamTransport::OnRequestSignaling( |
cricket::TransportChannelImpl* channel) { |
DCHECK(CalledOnValidThread()); |
channel_->OnSignalingReady(); |
} |
-void LibjingleTransport::OnCandidateReady( |
+void LibjingleStreamTransport::OnCandidateReady( |
cricket::TransportChannelImpl* channel, |
const cricket::Candidate& candidate) { |
DCHECK(CalledOnValidThread()); |
event_handler_->OnTransportCandidate(this, candidate); |
} |
-void LibjingleTransport::OnRouteChange( |
+void LibjingleStreamTransport::OnRouteChange( |
cricket::TransportChannel* channel, |
const cricket::Candidate& candidate) { |
TransportRoute route; |
@@ -264,7 +319,7 @@ |
event_handler_->OnTransportRouteChange(this, route); |
} |
-void LibjingleTransport::OnWritableState( |
+void LibjingleStreamTransport::OnWritableState( |
cricket::TransportChannel* channel) { |
DCHECK_EQ(channel, channel_.get()); |
@@ -278,14 +333,39 @@ |
} |
} |
-void LibjingleTransport::OnChannelDestroyed() { |
+void LibjingleStreamTransport::OnTcpConnected(int result) { |
+ DCHECK(CalledOnValidThread()); |
+ |
+ if (result != net::OK) { |
+ NotifyConnectFailed(); |
+ return; |
+ } |
+ |
+ authenticator_->SecureAndAuthenticate( |
+ socket_.PassAs<net::StreamSocket>(), |
+ base::Bind(&LibjingleStreamTransport::OnAuthenticationDone, |
+ base::Unretained(this))); |
+} |
+ |
+void LibjingleStreamTransport::OnAuthenticationDone( |
+ net::Error error, |
+ scoped_ptr<net::StreamSocket> socket) { |
+ if (error != net::OK) { |
+ NotifyConnectFailed(); |
+ return; |
+ } |
+ |
+ NotifyConnected(socket.Pass()); |
+} |
+ |
+void LibjingleStreamTransport::OnChannelDestroyed() { |
if (is_connected()) { |
// The connection socket is being deleted, so delete the transport too. |
delete this; |
} |
} |
-void LibjingleTransport::TryReconnect() { |
+void LibjingleStreamTransport::TryReconnect() { |
DCHECK(!channel_->writable()); |
if (connect_attempts_left_ <= 0) { |
@@ -301,6 +381,31 @@ |
// Restart ICE by resetting ICE password. |
ice_password_ = rtc::CreateRandomString(cricket::ICE_PWD_LENGTH); |
channel_->SetIceCredentials(ice_username_fragment_, ice_password_); |
+} |
+ |
+void LibjingleStreamTransport::NotifyConnected( |
+ scoped_ptr<net::StreamSocket> socket) { |
+ DCHECK(!is_connected()); |
+ StreamTransport::ConnectedCallback callback = callback_; |
+ callback_.Reset(); |
+ callback.Run(socket.Pass()); |
+} |
+ |
+void LibjingleStreamTransport::NotifyConnectFailed() { |
+ DCHECK(!is_connected()); |
+ |
+ socket_.reset(); |
+ |
+ // This method may be called in response to a libjingle signal, so |
+ // libjingle objects must be deleted asynchronously. |
+ if (channel_.get()) { |
+ base::ThreadTaskRunnerHandle::Get()->DeleteSoon( |
+ FROM_HERE, channel_.release()); |
+ } |
+ |
+ authenticator_.reset(); |
+ |
+ NotifyConnected(scoped_ptr<net::StreamSocket>()); |
} |
} // namespace |
@@ -326,9 +431,9 @@ |
EnsureFreshJingleInfo(); |
} |
-scoped_ptr<Transport> LibjingleTransportFactory::CreateTransport() { |
- scoped_ptr<LibjingleTransport> result( |
- new LibjingleTransport(port_allocator_.get(), network_settings_)); |
+scoped_ptr<StreamTransport> LibjingleTransportFactory::CreateStreamTransport() { |
+ scoped_ptr<LibjingleStreamTransport> result( |
+ new LibjingleStreamTransport(port_allocator_.get(), network_settings_)); |
EnsureFreshJingleInfo(); |
@@ -336,13 +441,19 @@ |
// transport until the request is finished. |
if (jingle_info_request_) { |
on_jingle_info_callbacks_.push_back( |
- base::Bind(&LibjingleTransport::OnCanStart, |
+ base::Bind(&LibjingleStreamTransport::OnCanStart, |
result->AsWeakPtr())); |
} else { |
result->OnCanStart(); |
} |
- return result.PassAs<Transport>(); |
+ return result.PassAs<StreamTransport>(); |
+} |
+ |
+scoped_ptr<DatagramTransport> |
+LibjingleTransportFactory::CreateDatagramTransport() { |
+ NOTIMPLEMENTED(); |
+ return scoped_ptr<DatagramTransport>(); |
} |
void LibjingleTransportFactory::EnsureFreshJingleInfo() { |