Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(445)

Unified Diff: remoting/protocol/webrtc_data_stream_adapter.cc

Issue 2146213002: Add support for dynamic channels in WebrtcTransport. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « remoting/protocol/webrtc_data_stream_adapter.h ('k') | remoting/protocol/webrtc_transport.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: remoting/protocol/webrtc_data_stream_adapter.cc
diff --git a/remoting/protocol/webrtc_data_stream_adapter.cc b/remoting/protocol/webrtc_data_stream_adapter.cc
index 91a2f7318ef27ad43c9dba3851106d0cfe73363b..15c97aef14db23ce02bd26d497fa619fb2f3b0a9 100644
--- a/remoting/protocol/webrtc_data_stream_adapter.cc
+++ b/remoting/protocol/webrtc_data_stream_adapter.cc
@@ -24,7 +24,7 @@ namespace protocol {
class WebrtcDataStreamAdapter::Channel : public MessagePipe,
public webrtc::DataChannelObserver {
public:
- explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter);
+ explicit Channel(WebrtcDataStreamAdapter* adapter);
~Channel() override;
void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
@@ -32,7 +32,7 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe,
std::string name() { return channel_->label(); }
// MessagePipe interface.
- void StartReceiving(const MessageReceivedCallback& callback) override;
+ void Start(EventHandler* event_handler) override;
void Send(google::protobuf::MessageLite* message,
const base::Closure& done) override;
@@ -45,21 +45,21 @@ class WebrtcDataStreamAdapter::Channel : public MessagePipe,
void OnConnected();
- void OnError();
+ void OnClosed();
- base::WeakPtr<WebrtcDataStreamAdapter> adapter_;
+ // |adapter_| owns channels while they are being connected.
+ WebrtcDataStreamAdapter* adapter_;
rtc::scoped_refptr<webrtc::DataChannelInterface> channel_;
- MessageReceivedCallback message_received_callback_;
+ EventHandler* event_handler_ = nullptr;
State state_ = State::CONNECTING;
DISALLOW_COPY_AND_ASSIGN(Channel);
};
-WebrtcDataStreamAdapter::Channel::Channel(
- base::WeakPtr<WebrtcDataStreamAdapter> adapter)
+WebrtcDataStreamAdapter::Channel::Channel(WebrtcDataStreamAdapter* adapter)
: adapter_(adapter) {}
WebrtcDataStreamAdapter::Channel::~Channel() {
@@ -83,24 +83,26 @@ void WebrtcDataStreamAdapter::Channel::Start(
}
}
-void WebrtcDataStreamAdapter::Channel::StartReceiving(
- const MessageReceivedCallback& callback) {
- DCHECK(message_received_callback_.is_null());
- DCHECK(!callback.is_null());
+void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) {
+ DCHECK(!event_handler_);
+ DCHECK(event_handler);
- message_received_callback_ = callback;
+ event_handler_ = event_handler;
}
void WebrtcDataStreamAdapter::Channel::Send(
google::protobuf::MessageLite* message,
const base::Closure& done) {
+ DCHECK(state_ == State::OPEN);
+
rtc::CopyOnWriteBuffer buffer;
buffer.SetSize(message->ByteSize());
message->SerializeWithCachedSizesToArray(
reinterpret_cast<uint8_t*>(buffer.data()));
webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */);
if (!channel_->Send(data_buffer)) {
- OnError();
+ LOG(ERROR) << "Send failed on data channel " << channel_->label();
+ channel_->Close();
return;
}
@@ -115,8 +117,7 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() {
break;
case webrtc::DataChannelInterface::kClosing:
- // Currently channels are not expected to be closed.
- OnError();
+ OnClosed();
break;
case webrtc::DataChannelInterface::kConnecting:
@@ -125,35 +126,48 @@ void WebrtcDataStreamAdapter::Channel::OnStateChange() {
}
}
-void WebrtcDataStreamAdapter::Channel::OnConnected() {
- CHECK(state_ == State::CONNECTING);
- state_ = State::OPEN;
- adapter_->OnChannelConnected(this);
-}
-
-void WebrtcDataStreamAdapter::Channel::OnError() {
- if (state_ == State::CLOSED)
- return;
-
- state_ = State::CLOSED;
-
- // Notify the adapter about the error asychronously.
- base::ThreadTaskRunnerHandle::Get()->PostTask(
- FROM_HERE,
- base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_));
-}
-
void WebrtcDataStreamAdapter::Channel::OnMessage(
const webrtc::DataBuffer& rtc_buffer) {
+ if (state_ != State::OPEN) {
+ LOG(ERROR) << "Dropping a message received when the channel is not open.";
+ return;
+ }
+
std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer());
buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()),
rtc_buffer.data.size());
buffer->Lock();
- message_received_callback_.Run(std::move(buffer));
+ event_handler_->OnMessageReceived(std::move(buffer));
+}
+
+void WebrtcDataStreamAdapter::Channel::OnConnected() {
+ DCHECK(state_ == State::CONNECTING);
+ state_ = State::OPEN;
+ WebrtcDataStreamAdapter* adapter = adapter_;
+ adapter_ = nullptr;
+ adapter->OnChannelConnected(this);
+}
+
+void WebrtcDataStreamAdapter::Channel::OnClosed() {
+ switch (state_) {
+ case State::CONNECTING:
+ state_ = State::CLOSED;
+ LOG(WARNING) << "Channel " << channel_->label()
+ << " was closed before it's connected.";
+ adapter_->OnChannelError();
+ return;
+
+ case State::OPEN:
+ state_ = State::CLOSED;
+ event_handler_->OnMessagePipeClosed();
+ return;
+
+ case State::CLOSED:
+ break;
+ }
}
struct WebrtcDataStreamAdapter::PendingChannel {
- PendingChannel() {}
PendingChannel(std::unique_ptr<Channel> channel,
const ChannelCreatedCallback& connected_callback)
: channel(std::move(channel)), connected_callback(connected_callback) {}
@@ -171,11 +185,8 @@ struct WebrtcDataStreamAdapter::PendingChannel {
};
WebrtcDataStreamAdapter::WebrtcDataStreamAdapter(
- bool outgoing,
const ErrorCallback& error_callback)
- : outgoing_(outgoing),
- error_callback_(error_callback),
- weak_factory_(this) {}
+ : error_callback_(error_callback) {}
WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() {
DCHECK(pending_channels_.empty());
@@ -186,32 +197,19 @@ void WebrtcDataStreamAdapter::Initialize(
peer_connection_ = peer_connection;
}
-void WebrtcDataStreamAdapter::OnIncomingDataChannel(
- webrtc::DataChannelInterface* data_channel) {
- DCHECK(!outgoing_);
-
- auto it = pending_channels_.find(data_channel->label());
- if (it == pending_channels_.end()) {
- LOG(ERROR) << "Received unexpected data channel " << data_channel->label();
- return;
- }
- it->second.channel->Start(data_channel);
+void WebrtcDataStreamAdapter::WrapIncomingDataChannel(
+ rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
+ const ChannelCreatedCallback& callback) {
+ AddPendingChannel(data_channel, callback);
}
void WebrtcDataStreamAdapter::CreateChannel(
const std::string& name,
const ChannelCreatedCallback& callback) {
- DCHECK(peer_connection_);
- DCHECK(pending_channels_.find(name) == pending_channels_.end());
-
- Channel* channel = new Channel(weak_factory_.GetWeakPtr());
- pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback);
-
- if (outgoing_) {
- webrtc::DataChannelInit config;
- config.reliable = true;
- channel->Start(peer_connection_->CreateDataChannel(name, &config));
- }
+ webrtc::DataChannelInit config;
+ config.reliable = true;
+ AddPendingChannel(peer_connection_->CreateDataChannel(name, &config),
+ callback);
}
void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) {
@@ -220,9 +218,25 @@ void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) {
pending_channels_.erase(it);
}
+void WebrtcDataStreamAdapter::AddPendingChannel(
+ rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
+ const ChannelCreatedCallback& callback) {
+ DCHECK(peer_connection_);
+ DCHECK(pending_channels_.find(data_channel->label()) ==
+ pending_channels_.end());
+
+ Channel* channel = new Channel(this);
+ pending_channels_.insert(
+ std::make_pair(data_channel->label(),
+ PendingChannel(base::WrapUnique(channel), callback)));
+
+ channel->Start(data_channel);
+}
+
void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) {
auto it = pending_channels_.find(channel->name());
DCHECK(it != pending_channels_.end());
+
PendingChannel pending_channel = std::move(it->second);
pending_channels_.erase(it);
@@ -232,6 +246,7 @@ void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) {
}
void WebrtcDataStreamAdapter::OnChannelError() {
+ pending_channels_.clear();
error_callback_.Run(CHANNEL_CONNECTION_ERROR);
}
« no previous file with comments | « remoting/protocol/webrtc_data_stream_adapter.h ('k') | remoting/protocol/webrtc_transport.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698