Chromium Code Reviews| Index: remoting/protocol/webrtc_data_stream_adapter.cc |
| diff --git a/remoting/protocol/webrtc_data_stream_adapter.cc b/remoting/protocol/webrtc_data_stream_adapter.cc |
| index 91a2f7318ef27ad43c9dba3851106d0cfe73363b..7f5c73862585b234fc4b917eed006b12fb92c305 100644 |
| --- a/remoting/protocol/webrtc_data_stream_adapter.cc |
| +++ b/remoting/protocol/webrtc_data_stream_adapter.cc |
| @@ -32,7 +32,7 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
| std::string name() { return channel_->label(); } |
| // MessagePipe interface. |
| - void StartReceiving(const MessageReceivedCallback& callback) override; |
| + void Start(EventHandler* event_handler) override; |
| void Send(google::protobuf::MessageLite* message, |
| const base::Closure& done) override; |
| @@ -45,22 +45,25 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
| void OnConnected(); |
| - void OnError(); |
| + void OnClosed(); |
| + void NotifyClosed(); |
| base::WeakPtr<WebrtcDataStreamAdapter> adapter_; |
| rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; |
| - MessageReceivedCallback message_received_callback_; |
| + EventHandler* event_handler_ = nullptr; |
| State state_ = State::CONNECTING; |
| + base::WeakPtrFactory<Channel> weak_factory_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(Channel); |
| }; |
| WebrtcDataStreamAdapter::Channel::Channel( |
| base::WeakPtr<WebrtcDataStreamAdapter> adapter) |
| - : adapter_(adapter) {} |
| + : adapter_(adapter), weak_factory_(this) {} |
| WebrtcDataStreamAdapter::Channel::~Channel() { |
| if (channel_) { |
| @@ -83,24 +86,27 @@ void WebrtcDataStreamAdapter::Channel::Start( |
| } |
| } |
| -void WebrtcDataStreamAdapter::Channel::StartReceiving( |
| - const MessageReceivedCallback& callback) { |
| - DCHECK(message_received_callback_.is_null()); |
| - DCHECK(!callback.is_null()); |
| +void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) { |
| + DCHECK(!event_handler_); |
| + DCHECK(event_handler); |
| - message_received_callback_ = callback; |
| + event_handler_ = event_handler; |
| } |
| void WebrtcDataStreamAdapter::Channel::Send( |
| google::protobuf::MessageLite* message, |
| const base::Closure& done) { |
| + if(state_ != State::OPEN) |
| + return; |
|
Jamie
2016/07/19 18:24:47
Should we log an error here? Silently dropping the
Sergey Ulanov
2016/07/19 23:38:44
Replaced this with a DCHECK().
Also changed OnMess
|
| + |
| rtc::CopyOnWriteBuffer buffer; |
| buffer.SetSize(message->ByteSize()); |
| message->SerializeWithCachedSizesToArray( |
| reinterpret_cast<uint8_t*>(buffer.data())); |
| webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); |
| if (!channel_->Send(data_buffer)) { |
| - OnError(); |
| + LOG(ERROR) << "Send failed on data channel " << channel_->label(); |
| + channel_->Close(); |
| return; |
| } |
| @@ -116,7 +122,7 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
| case webrtc::DataChannelInterface::kClosing: |
| // Currently channels are not expected to be closed. |
|
Jamie
2016/07/19 18:24:47
Log an error in this case?
Sergey Ulanov
2016/07/19 23:38:43
Removed the comment. This class now does support c
|
| - OnError(); |
| + OnClosed(); |
| break; |
| case webrtc::DataChannelInterface::kConnecting: |
| @@ -125,35 +131,50 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
| } |
| } |
| +void WebrtcDataStreamAdapter::Channel::OnMessage( |
| + const webrtc::DataBuffer& rtc_buffer) { |
| + std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); |
| + buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), |
| + rtc_buffer.data.size()); |
| + buffer->Lock(); |
| + event_handler_->OnMessageReceived(std::move(buffer)); |
| +} |
| + |
| void WebrtcDataStreamAdapter::Channel::OnConnected() { |
| - CHECK(state_ == State::CONNECTING); |
| + DCHECK(state_ == State::CONNECTING); |
| state_ = State::OPEN; |
| adapter_->OnChannelConnected(this); |
| } |
| -void WebrtcDataStreamAdapter::Channel::OnError() { |
| - if (state_ == State::CLOSED) |
| - return; |
| - |
| - state_ = State::CLOSED; |
| - |
| - // Notify the adapter about the error asychronously. |
| - base::ThreadTaskRunnerHandle::Get()->PostTask( |
| - FROM_HERE, |
| - base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); |
| +void WebrtcDataStreamAdapter::Channel::OnClosed() { |
| + switch (state_) { |
| + case State::CONNECTING: |
|
Jamie
2016/07/19 18:24:47
You're not setting the state to CLOSED in this bra
Sergey Ulanov
2016/07/19 23:38:44
No. Fixed now.
|
| + LOG(WARNING) << "Channel " << channel_->label() |
| + << " was closed before it's connected."; |
| + // Notify the adapter about the error asynchronously. |
| + base::ThreadTaskRunnerHandle::Get()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); |
| + return; |
| + |
| + case State::OPEN: |
| + state_ = State::CLOSED; |
| + base::ThreadTaskRunnerHandle::Get()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&Channel::NotifyClosed, weak_factory_.GetWeakPtr())); |
| + return; |
| + |
| + case State::CLOSED: |
| + break; |
| + } |
| } |
| -void WebrtcDataStreamAdapter::Channel::OnMessage( |
| - const webrtc::DataBuffer& rtc_buffer) { |
| - std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); |
| - buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), |
| - rtc_buffer.data.size()); |
| - buffer->Lock(); |
| - message_received_callback_.Run(std::move(buffer)); |
| +void WebrtcDataStreamAdapter::Channel::NotifyClosed() { |
| + DCHECK(state_ == State::CLOSED); |
| + event_handler_->OnMessagePipeClosed(); |
| } |
| struct WebrtcDataStreamAdapter::PendingChannel { |
| - PendingChannel() {} |
| PendingChannel(std::unique_ptr<Channel> channel, |
| const ChannelCreatedCallback& connected_callback) |
| : channel(std::move(channel)), connected_callback(connected_callback) {} |
| @@ -171,11 +192,8 @@ struct WebrtcDataStreamAdapter::PendingChannel { |
| }; |
| WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( |
| - bool outgoing, |
| const ErrorCallback& error_callback) |
| - : outgoing_(outgoing), |
| - error_callback_(error_callback), |
| - weak_factory_(this) {} |
| + : error_callback_(error_callback), weak_factory_(this) {} |
| WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { |
| DCHECK(pending_channels_.empty()); |
| @@ -186,32 +204,19 @@ void WebrtcDataStreamAdapter::Initialize( |
| peer_connection_ = peer_connection; |
| } |
| -void WebrtcDataStreamAdapter::OnIncomingDataChannel( |
| - webrtc::DataChannelInterface* data_channel) { |
| - DCHECK(!outgoing_); |
| - |
| - auto it = pending_channels_.find(data_channel->label()); |
| - if (it == pending_channels_.end()) { |
| - LOG(ERROR) << "Received unexpected data channel " << data_channel->label(); |
| - return; |
| - } |
| - it->second.channel->Start(data_channel); |
| +void WebrtcDataStreamAdapter::WrapIncomingDataChannel( |
| + rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
| + const ChannelCreatedCallback& callback) { |
| + AddPendingChannel(data_channel, callback); |
| } |
| void WebrtcDataStreamAdapter::CreateChannel( |
| const std::string& name, |
| const ChannelCreatedCallback& callback) { |
| - DCHECK(peer_connection_); |
| - DCHECK(pending_channels_.find(name) == pending_channels_.end()); |
| - |
| - Channel* channel = new Channel(weak_factory_.GetWeakPtr()); |
| - pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback); |
| - |
| - if (outgoing_) { |
| - webrtc::DataChannelInit config; |
| - config.reliable = true; |
| - channel->Start(peer_connection_->CreateDataChannel(name, &config)); |
| - } |
| + webrtc::DataChannelInit config; |
| + config.reliable = true; |
| + AddPendingChannel(peer_connection_->CreateDataChannel(name, &config), |
| + callback); |
| } |
| void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
| @@ -220,9 +225,25 @@ void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
| pending_channels_.erase(it); |
| } |
| +void WebrtcDataStreamAdapter::AddPendingChannel( |
| + rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
| + const ChannelCreatedCallback& callback) { |
| + DCHECK(peer_connection_); |
| + DCHECK(pending_channels_.find(data_channel->label()) == |
| + pending_channels_.end()); |
| + |
| + Channel* channel = new Channel(weak_factory_.GetWeakPtr()); |
| + pending_channels_.insert( |
| + std::make_pair(data_channel->label(), |
| + PendingChannel(base::WrapUnique(channel), callback))); |
| + |
| + channel->Start(data_channel); |
| +} |
| + |
| void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { |
| auto it = pending_channels_.find(channel->name()); |
| DCHECK(it != pending_channels_.end()); |
| + |
| PendingChannel pending_channel = std::move(it->second); |
| pending_channels_.erase(it); |