Index: remoting/protocol/webrtc_data_stream_adapter.cc |
diff --git a/remoting/protocol/webrtc_data_stream_adapter.cc b/remoting/protocol/webrtc_data_stream_adapter.cc |
index 91a2f7318ef27ad43c9dba3851106d0cfe73363b..15c97aef14db23ce02bd26d497fa619fb2f3b0a9 100644 |
--- a/remoting/protocol/webrtc_data_stream_adapter.cc |
+++ b/remoting/protocol/webrtc_data_stream_adapter.cc |
@@ -24,7 +24,7 @@ namespace protocol { |
class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
public webrtc::DataChannelObserver { |
public: |
- explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter); |
+ explicit Channel(WebrtcDataStreamAdapter* adapter); |
~Channel() override; |
void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); |
@@ -32,7 +32,7 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
std::string name() { return channel_->label(); } |
// MessagePipe interface. |
- void StartReceiving(const MessageReceivedCallback& callback) override; |
+ void Start(EventHandler* event_handler) override; |
void Send(google::protobuf::MessageLite* message, |
const base::Closure& done) override; |
@@ -45,21 +45,21 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
void OnConnected(); |
- void OnError(); |
+ void OnClosed(); |
- base::WeakPtr<WebrtcDataStreamAdapter> adapter_; |
+ // |adapter_| owns channels while they are being connected. |
+ WebrtcDataStreamAdapter* adapter_; |
rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; |
- MessageReceivedCallback message_received_callback_; |
+ EventHandler* event_handler_ = nullptr; |
State state_ = State::CONNECTING; |
DISALLOW_COPY_AND_ASSIGN(Channel); |
}; |
-WebrtcDataStreamAdapter::Channel::Channel( |
- base::WeakPtr<WebrtcDataStreamAdapter> adapter) |
+WebrtcDataStreamAdapter::Channel::Channel(WebrtcDataStreamAdapter* adapter) |
: adapter_(adapter) {} |
WebrtcDataStreamAdapter::Channel::~Channel() { |
@@ -83,24 +83,26 @@ void WebrtcDataStreamAdapter::Channel::Start( |
} |
} |
-void WebrtcDataStreamAdapter::Channel::StartReceiving( |
- const MessageReceivedCallback& callback) { |
- DCHECK(message_received_callback_.is_null()); |
- DCHECK(!callback.is_null()); |
+void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) { |
+ DCHECK(!event_handler_); |
+ DCHECK(event_handler); |
- message_received_callback_ = callback; |
+ event_handler_ = event_handler; |
} |
void WebrtcDataStreamAdapter::Channel::Send( |
google::protobuf::MessageLite* message, |
const base::Closure& done) { |
+ DCHECK(state_ == State::OPEN); |
+ |
rtc::CopyOnWriteBuffer buffer; |
buffer.SetSize(message->ByteSize()); |
message->SerializeWithCachedSizesToArray( |
reinterpret_cast<uint8_t*>(buffer.data())); |
webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); |
if (!channel_->Send(data_buffer)) { |
- OnError(); |
+ LOG(ERROR) << "Send failed on data channel " << channel_->label(); |
+ channel_->Close(); |
return; |
} |
@@ -115,8 +117,7 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
break; |
case webrtc::DataChannelInterface::kClosing: |
- // Currently channels are not expected to be closed. |
- OnError(); |
+ OnClosed(); |
break; |
case webrtc::DataChannelInterface::kConnecting: |
@@ -125,35 +126,48 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
} |
} |
-void WebrtcDataStreamAdapter::Channel::OnConnected() { |
- CHECK(state_ == State::CONNECTING); |
- state_ = State::OPEN; |
- adapter_->OnChannelConnected(this); |
-} |
- |
-void WebrtcDataStreamAdapter::Channel::OnError() { |
- if (state_ == State::CLOSED) |
- return; |
- |
- state_ = State::CLOSED; |
- |
- // Notify the adapter about the error asychronously. |
- base::ThreadTaskRunnerHandle::Get()->PostTask( |
- FROM_HERE, |
- base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); |
-} |
- |
void WebrtcDataStreamAdapter::Channel::OnMessage( |
const webrtc::DataBuffer& rtc_buffer) { |
+ if (state_ != State::OPEN) { |
+ LOG(ERROR) << "Dropping a message received when the channel is not open."; |
+ return; |
+ } |
+ |
std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); |
buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), |
rtc_buffer.data.size()); |
buffer->Lock(); |
- message_received_callback_.Run(std::move(buffer)); |
+ event_handler_->OnMessageReceived(std::move(buffer)); |
+} |
+ |
+void WebrtcDataStreamAdapter::Channel::OnConnected() { |
+ DCHECK(state_ == State::CONNECTING); |
+ state_ = State::OPEN; |
+ WebrtcDataStreamAdapter* adapter = adapter_; |
+ adapter_ = nullptr; |
+ adapter->OnChannelConnected(this); |
+} |
+ |
+void WebrtcDataStreamAdapter::Channel::OnClosed() { |
+ switch (state_) { |
+ case State::CONNECTING: |
+ state_ = State::CLOSED; |
+ LOG(WARNING) << "Channel " << channel_->label() |
+ << " was closed before it's connected."; |
+ adapter_->OnChannelError(); |
+ return; |
+ |
+ case State::OPEN: |
+ state_ = State::CLOSED; |
+ event_handler_->OnMessagePipeClosed(); |
+ return; |
+ |
+ case State::CLOSED: |
+ break; |
+ } |
} |
struct WebrtcDataStreamAdapter::PendingChannel { |
- PendingChannel() {} |
PendingChannel(std::unique_ptr<Channel> channel, |
const ChannelCreatedCallback& connected_callback) |
: channel(std::move(channel)), connected_callback(connected_callback) {} |
@@ -171,11 +185,8 @@ struct WebrtcDataStreamAdapter::PendingChannel { |
}; |
WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( |
- bool outgoing, |
const ErrorCallback& error_callback) |
- : outgoing_(outgoing), |
- error_callback_(error_callback), |
- weak_factory_(this) {} |
+ : error_callback_(error_callback) {} |
WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { |
DCHECK(pending_channels_.empty()); |
@@ -186,32 +197,19 @@ void WebrtcDataStreamAdapter::Initialize( |
peer_connection_ = peer_connection; |
} |
-void WebrtcDataStreamAdapter::OnIncomingDataChannel( |
- webrtc::DataChannelInterface* data_channel) { |
- DCHECK(!outgoing_); |
- |
- auto it = pending_channels_.find(data_channel->label()); |
- if (it == pending_channels_.end()) { |
- LOG(ERROR) << "Received unexpected data channel " << data_channel->label(); |
- return; |
- } |
- it->second.channel->Start(data_channel); |
+void WebrtcDataStreamAdapter::WrapIncomingDataChannel( |
+ rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
+ const ChannelCreatedCallback& callback) { |
+ AddPendingChannel(data_channel, callback); |
} |
void WebrtcDataStreamAdapter::CreateChannel( |
const std::string& name, |
const ChannelCreatedCallback& callback) { |
- DCHECK(peer_connection_); |
- DCHECK(pending_channels_.find(name) == pending_channels_.end()); |
- |
- Channel* channel = new Channel(weak_factory_.GetWeakPtr()); |
- pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback); |
- |
- if (outgoing_) { |
- webrtc::DataChannelInit config; |
- config.reliable = true; |
- channel->Start(peer_connection_->CreateDataChannel(name, &config)); |
- } |
+ webrtc::DataChannelInit config; |
+ config.reliable = true; |
+ AddPendingChannel(peer_connection_->CreateDataChannel(name, &config), |
+ callback); |
} |
void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
@@ -220,9 +218,25 @@ void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
pending_channels_.erase(it); |
} |
+void WebrtcDataStreamAdapter::AddPendingChannel( |
+ rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
+ const ChannelCreatedCallback& callback) { |
+ DCHECK(peer_connection_); |
+ DCHECK(pending_channels_.find(data_channel->label()) == |
+ pending_channels_.end()); |
+ |
+ Channel* channel = new Channel(this); |
+ pending_channels_.insert( |
+ std::make_pair(data_channel->label(), |
+ PendingChannel(base::WrapUnique(channel), callback))); |
+ |
+ channel->Start(data_channel); |
+} |
+ |
void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { |
auto it = pending_channels_.find(channel->name()); |
DCHECK(it != pending_channels_.end()); |
+ |
PendingChannel pending_channel = std::move(it->second); |
pending_channels_.erase(it); |
@@ -232,6 +246,7 @@ void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { |
} |
void WebrtcDataStreamAdapter::OnChannelError() { |
+ pending_channels_.clear(); |
error_callback_.Run(CHANNEL_CONNECTION_ERROR); |
} |