Index: remoting/protocol/webrtc_data_stream_adapter.cc |
diff --git a/remoting/protocol/webrtc_data_stream_adapter.cc b/remoting/protocol/webrtc_data_stream_adapter.cc |
index 91a2f7318ef27ad43c9dba3851106d0cfe73363b..7f5c73862585b234fc4b917eed006b12fb92c305 100644 |
--- a/remoting/protocol/webrtc_data_stream_adapter.cc |
+++ b/remoting/protocol/webrtc_data_stream_adapter.cc |
@@ -32,7 +32,7 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
std::string name() { return channel_->label(); } |
// MessagePipe interface. |
- void StartReceiving(const MessageReceivedCallback& callback) override; |
+ void Start(EventHandler* event_handler) override; |
void Send(google::protobuf::MessageLite* message, |
const base::Closure& done) override; |
@@ -45,22 +45,25 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
void OnConnected(); |
- void OnError(); |
+ void OnClosed(); |
+ void NotifyClosed(); |
base::WeakPtr<WebrtcDataStreamAdapter> adapter_; |
rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; |
- MessageReceivedCallback message_received_callback_; |
+ EventHandler* event_handler_ = nullptr; |
State state_ = State::CONNECTING; |
+ base::WeakPtrFactory<Channel> weak_factory_; |
+ |
DISALLOW_COPY_AND_ASSIGN(Channel); |
}; |
WebrtcDataStreamAdapter::Channel::Channel( |
base::WeakPtr<WebrtcDataStreamAdapter> adapter) |
- : adapter_(adapter) {} |
+ : adapter_(adapter), weak_factory_(this) {} |
WebrtcDataStreamAdapter::Channel::~Channel() { |
if (channel_) { |
@@ -83,24 +86,27 @@ void WebrtcDataStreamAdapter::Channel::Start( |
} |
} |
-void WebrtcDataStreamAdapter::Channel::StartReceiving( |
- const MessageReceivedCallback& callback) { |
- DCHECK(message_received_callback_.is_null()); |
- DCHECK(!callback.is_null()); |
+void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) { |
+ DCHECK(!event_handler_); |
+ DCHECK(event_handler); |
- message_received_callback_ = callback; |
+ event_handler_ = event_handler; |
} |
void WebrtcDataStreamAdapter::Channel::Send( |
google::protobuf::MessageLite* message, |
const base::Closure& done) { |
+ if(state_ != State::OPEN) |
+ return; |
Jamie
2016/07/19 18:24:47
Should we log an error here? Silently dropping the
Sergey Ulanov
2016/07/19 23:38:44
Replaced this with a DCHECK().
Also changed OnMess
|
+ |
rtc::CopyOnWriteBuffer buffer; |
buffer.SetSize(message->ByteSize()); |
message->SerializeWithCachedSizesToArray( |
reinterpret_cast<uint8_t*>(buffer.data())); |
webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); |
if (!channel_->Send(data_buffer)) { |
- OnError(); |
+ LOG(ERROR) << "Send failed on data channel " << channel_->label(); |
+ channel_->Close(); |
return; |
} |
@@ -116,7 +122,7 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
case webrtc::DataChannelInterface::kClosing: |
// Currently channels are not expected to be closed. |
Jamie
2016/07/19 18:24:47
Log an error in this case?
Sergey Ulanov
2016/07/19 23:38:43
Removed the comment. This class now does support c
|
- OnError(); |
+ OnClosed(); |
break; |
case webrtc::DataChannelInterface::kConnecting: |
@@ -125,35 +131,50 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
} |
} |
+void WebrtcDataStreamAdapter::Channel::OnMessage( |
+ const webrtc::DataBuffer& rtc_buffer) { |
+ std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); |
+ buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), |
+ rtc_buffer.data.size()); |
+ buffer->Lock(); |
+ event_handler_->OnMessageReceived(std::move(buffer)); |
+} |
+ |
void WebrtcDataStreamAdapter::Channel::OnConnected() { |
- CHECK(state_ == State::CONNECTING); |
+ DCHECK(state_ == State::CONNECTING); |
state_ = State::OPEN; |
adapter_->OnChannelConnected(this); |
} |
-void WebrtcDataStreamAdapter::Channel::OnError() { |
- if (state_ == State::CLOSED) |
- return; |
- |
- state_ = State::CLOSED; |
- |
- // Notify the adapter about the error asychronously. |
- base::ThreadTaskRunnerHandle::Get()->PostTask( |
- FROM_HERE, |
- base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); |
+void WebrtcDataStreamAdapter::Channel::OnClosed() { |
+ switch (state_) { |
+ case State::CONNECTING: |
Jamie
2016/07/19 18:24:47
You're not setting the state to CLOSED in this bra
Sergey Ulanov
2016/07/19 23:38:44
No. Fixed now.
|
+ LOG(WARNING) << "Channel " << channel_->label() |
+ << " was closed before it's connected."; |
+ // Notify the adapter about the error asynchronously. |
+ base::ThreadTaskRunnerHandle::Get()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); |
+ return; |
+ |
+ case State::OPEN: |
+ state_ = State::CLOSED; |
+ base::ThreadTaskRunnerHandle::Get()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&Channel::NotifyClosed, weak_factory_.GetWeakPtr())); |
+ return; |
+ |
+ case State::CLOSED: |
+ break; |
+ } |
} |
-void WebrtcDataStreamAdapter::Channel::OnMessage( |
- const webrtc::DataBuffer& rtc_buffer) { |
- std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); |
- buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), |
- rtc_buffer.data.size()); |
- buffer->Lock(); |
- message_received_callback_.Run(std::move(buffer)); |
+void WebrtcDataStreamAdapter::Channel::NotifyClosed() { |
+ DCHECK(state_ == State::CLOSED); |
+ event_handler_->OnMessagePipeClosed(); |
} |
struct WebrtcDataStreamAdapter::PendingChannel { |
- PendingChannel() {} |
PendingChannel(std::unique_ptr<Channel> channel, |
const ChannelCreatedCallback& connected_callback) |
: channel(std::move(channel)), connected_callback(connected_callback) {} |
@@ -171,11 +192,8 @@ struct WebrtcDataStreamAdapter::PendingChannel { |
}; |
WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( |
- bool outgoing, |
const ErrorCallback& error_callback) |
- : outgoing_(outgoing), |
- error_callback_(error_callback), |
- weak_factory_(this) {} |
+ : error_callback_(error_callback), weak_factory_(this) {} |
WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { |
DCHECK(pending_channels_.empty()); |
@@ -186,32 +204,19 @@ void WebrtcDataStreamAdapter::Initialize( |
peer_connection_ = peer_connection; |
} |
-void WebrtcDataStreamAdapter::OnIncomingDataChannel( |
- webrtc::DataChannelInterface* data_channel) { |
- DCHECK(!outgoing_); |
- |
- auto it = pending_channels_.find(data_channel->label()); |
- if (it == pending_channels_.end()) { |
- LOG(ERROR) << "Received unexpected data channel " << data_channel->label(); |
- return; |
- } |
- it->second.channel->Start(data_channel); |
+void WebrtcDataStreamAdapter::WrapIncomingDataChannel( |
+ rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
+ const ChannelCreatedCallback& callback) { |
+ AddPendingChannel(data_channel, callback); |
} |
void WebrtcDataStreamAdapter::CreateChannel( |
const std::string& name, |
const ChannelCreatedCallback& callback) { |
- DCHECK(peer_connection_); |
- DCHECK(pending_channels_.find(name) == pending_channels_.end()); |
- |
- Channel* channel = new Channel(weak_factory_.GetWeakPtr()); |
- pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback); |
- |
- if (outgoing_) { |
- webrtc::DataChannelInit config; |
- config.reliable = true; |
- channel->Start(peer_connection_->CreateDataChannel(name, &config)); |
- } |
+ webrtc::DataChannelInit config; |
+ config.reliable = true; |
+ AddPendingChannel(peer_connection_->CreateDataChannel(name, &config), |
+ callback); |
} |
void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
@@ -220,9 +225,25 @@ void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
pending_channels_.erase(it); |
} |
+void WebrtcDataStreamAdapter::AddPendingChannel( |
+ rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
+ const ChannelCreatedCallback& callback) { |
+ DCHECK(peer_connection_); |
+ DCHECK(pending_channels_.find(data_channel->label()) == |
+ pending_channels_.end()); |
+ |
+ Channel* channel = new Channel(weak_factory_.GetWeakPtr()); |
+ pending_channels_.insert( |
+ std::make_pair(data_channel->label(), |
+ PendingChannel(base::WrapUnique(channel), callback))); |
+ |
+ channel->Start(data_channel); |
+} |
+ |
void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { |
auto it = pending_channels_.find(channel->name()); |
DCHECK(it != pending_channels_.end()); |
+ |
PendingChannel pending_channel = std::move(it->second); |
pending_channels_.erase(it); |