Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Side by Side Diff: remoting/protocol/webrtc_data_stream_adapter.cc

Issue 2146213002: Add support for dynamic channels in WebrtcTransport. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « remoting/protocol/webrtc_data_stream_adapter.h ('k') | remoting/protocol/webrtc_transport.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/webrtc_data_stream_adapter.h" 5 #include "remoting/protocol/webrtc_data_stream_adapter.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/callback_helpers.h" 11 #include "base/callback_helpers.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
15 #include "base/threading/thread_task_runner_handle.h" 15 #include "base/threading/thread_task_runner_handle.h"
16 #include "net/base/net_errors.h" 16 #include "net/base/net_errors.h"
17 #include "remoting/base/compound_buffer.h" 17 #include "remoting/base/compound_buffer.h"
18 #include "remoting/protocol/message_pipe.h" 18 #include "remoting/protocol/message_pipe.h"
19 #include "remoting/protocol/message_serialization.h" 19 #include "remoting/protocol/message_serialization.h"
20 20
21 namespace remoting { 21 namespace remoting {
22 namespace protocol { 22 namespace protocol {
23 23
24 class WebrtcDataStreamAdapter::Channel : public MessagePipe, 24 class WebrtcDataStreamAdapter::Channel : public MessagePipe,
25 public webrtc::DataChannelObserver { 25 public webrtc::DataChannelObserver {
26 public: 26 public:
27 explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter); 27 explicit Channel(WebrtcDataStreamAdapter* adapter);
28 ~Channel() override; 28 ~Channel() override;
29 29
30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); 30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
31 31
32 std::string name() { return channel_->label(); } 32 std::string name() { return channel_->label(); }
33 33
34 // MessagePipe interface. 34 // MessagePipe interface.
35 void StartReceiving(const MessageReceivedCallback& callback) override; 35 void Start(EventHandler* event_handler) override;
36 void Send(google::protobuf::MessageLite* message, 36 void Send(google::protobuf::MessageLite* message,
37 const base::Closure& done) override; 37 const base::Closure& done) override;
38 38
39 private: 39 private:
40 enum class State { CONNECTING, OPEN, CLOSED }; 40 enum class State { CONNECTING, OPEN, CLOSED };
41 41
42 // webrtc::DataChannelObserver interface. 42 // webrtc::DataChannelObserver interface.
43 void OnStateChange() override; 43 void OnStateChange() override;
44 void OnMessage(const webrtc::DataBuffer& buffer) override; 44 void OnMessage(const webrtc::DataBuffer& buffer) override;
45 45
46 void OnConnected(); 46 void OnConnected();
47 47
48 void OnError(); 48 void OnClosed();
49 49
50 base::WeakPtr<WebrtcDataStreamAdapter> adapter_; 50 // |adapter_| owns channels while they are being connected.
51 WebrtcDataStreamAdapter* adapter_;
51 52
52 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; 53 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_;
53 54
54 MessageReceivedCallback message_received_callback_; 55 EventHandler* event_handler_ = nullptr;
55 56
56 State state_ = State::CONNECTING; 57 State state_ = State::CONNECTING;
57 58
58 DISALLOW_COPY_AND_ASSIGN(Channel); 59 DISALLOW_COPY_AND_ASSIGN(Channel);
59 }; 60 };
60 61
61 WebrtcDataStreamAdapter::Channel::Channel( 62 WebrtcDataStreamAdapter::Channel::Channel(WebrtcDataStreamAdapter* adapter)
62 base::WeakPtr<WebrtcDataStreamAdapter> adapter)
63 : adapter_(adapter) {} 63 : adapter_(adapter) {}
64 64
65 WebrtcDataStreamAdapter::Channel::~Channel() { 65 WebrtcDataStreamAdapter::Channel::~Channel() {
66 if (channel_) { 66 if (channel_) {
67 channel_->UnregisterObserver(); 67 channel_->UnregisterObserver();
68 channel_->Close(); 68 channel_->Close();
69 } 69 }
70 } 70 }
71 71
72 void WebrtcDataStreamAdapter::Channel::Start( 72 void WebrtcDataStreamAdapter::Channel::Start(
73 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { 73 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
74 DCHECK(!channel_); 74 DCHECK(!channel_);
75 75
76 channel_ = channel; 76 channel_ = channel;
77 channel_->RegisterObserver(this); 77 channel_->RegisterObserver(this);
78 78
79 if (channel_->state() == webrtc::DataChannelInterface::kOpen) { 79 if (channel_->state() == webrtc::DataChannelInterface::kOpen) {
80 OnConnected(); 80 OnConnected();
81 } else { 81 } else {
82 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); 82 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting);
83 } 83 }
84 } 84 }
85 85
86 void WebrtcDataStreamAdapter::Channel::StartReceiving( 86 void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) {
87 const MessageReceivedCallback& callback) { 87 DCHECK(!event_handler_);
88 DCHECK(message_received_callback_.is_null()); 88 DCHECK(event_handler);
89 DCHECK(!callback.is_null());
90 89
91 message_received_callback_ = callback; 90 event_handler_ = event_handler;
92 } 91 }
93 92
94 void WebrtcDataStreamAdapter::Channel::Send( 93 void WebrtcDataStreamAdapter::Channel::Send(
95 google::protobuf::MessageLite* message, 94 google::protobuf::MessageLite* message,
96 const base::Closure& done) { 95 const base::Closure& done) {
96 DCHECK(state_ == State::OPEN);
97
97 rtc::CopyOnWriteBuffer buffer; 98 rtc::CopyOnWriteBuffer buffer;
98 buffer.SetSize(message->ByteSize()); 99 buffer.SetSize(message->ByteSize());
99 message->SerializeWithCachedSizesToArray( 100 message->SerializeWithCachedSizesToArray(
100 reinterpret_cast<uint8_t*>(buffer.data())); 101 reinterpret_cast<uint8_t*>(buffer.data()));
101 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); 102 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */);
102 if (!channel_->Send(data_buffer)) { 103 if (!channel_->Send(data_buffer)) {
103 OnError(); 104 LOG(ERROR) << "Send failed on data channel " << channel_->label();
105 channel_->Close();
104 return; 106 return;
105 } 107 }
106 108
107 if (!done.is_null()) 109 if (!done.is_null())
108 done.Run(); 110 done.Run();
109 } 111 }
110 112
111 void WebrtcDataStreamAdapter::Channel::OnStateChange() { 113 void WebrtcDataStreamAdapter::Channel::OnStateChange() {
112 switch (channel_->state()) { 114 switch (channel_->state()) {
113 case webrtc::DataChannelInterface::kOpen: 115 case webrtc::DataChannelInterface::kOpen:
114 OnConnected(); 116 OnConnected();
115 break; 117 break;
116 118
117 case webrtc::DataChannelInterface::kClosing: 119 case webrtc::DataChannelInterface::kClosing:
118 // Currently channels are not expected to be closed. 120 OnClosed();
119 OnError();
120 break; 121 break;
121 122
122 case webrtc::DataChannelInterface::kConnecting: 123 case webrtc::DataChannelInterface::kConnecting:
123 case webrtc::DataChannelInterface::kClosed: 124 case webrtc::DataChannelInterface::kClosed:
124 break; 125 break;
125 } 126 }
126 } 127 }
127 128
128 void WebrtcDataStreamAdapter::Channel::OnConnected() {
129 CHECK(state_ == State::CONNECTING);
130 state_ = State::OPEN;
131 adapter_->OnChannelConnected(this);
132 }
133
134 void WebrtcDataStreamAdapter::Channel::OnError() {
135 if (state_ == State::CLOSED)
136 return;
137
138 state_ = State::CLOSED;
139
140 // Notify the adapter about the error asychronously.
141 base::ThreadTaskRunnerHandle::Get()->PostTask(
142 FROM_HERE,
143 base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_));
144 }
145
146 void WebrtcDataStreamAdapter::Channel::OnMessage( 129 void WebrtcDataStreamAdapter::Channel::OnMessage(
147 const webrtc::DataBuffer& rtc_buffer) { 130 const webrtc::DataBuffer& rtc_buffer) {
131 if (state_ != State::OPEN) {
132 LOG(ERROR) << "Dropping a message received when the channel is not open.";
133 return;
134 }
135
148 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); 136 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer());
149 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), 137 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()),
150 rtc_buffer.data.size()); 138 rtc_buffer.data.size());
151 buffer->Lock(); 139 buffer->Lock();
152 message_received_callback_.Run(std::move(buffer)); 140 event_handler_->OnMessageReceived(std::move(buffer));
141 }
142
143 void WebrtcDataStreamAdapter::Channel::OnConnected() {
144 DCHECK(state_ == State::CONNECTING);
145 state_ = State::OPEN;
146 WebrtcDataStreamAdapter* adapter = adapter_;
147 adapter_ = nullptr;
148 adapter->OnChannelConnected(this);
149 }
150
151 void WebrtcDataStreamAdapter::Channel::OnClosed() {
152 switch (state_) {
153 case State::CONNECTING:
154 state_ = State::CLOSED;
155 LOG(WARNING) << "Channel " << channel_->label()
156 << " was closed before it's connected.";
157 adapter_->OnChannelError();
158 return;
159
160 case State::OPEN:
161 state_ = State::CLOSED;
162 event_handler_->OnMessagePipeClosed();
163 return;
164
165 case State::CLOSED:
166 break;
167 }
153 } 168 }
154 169
155 struct WebrtcDataStreamAdapter::PendingChannel { 170 struct WebrtcDataStreamAdapter::PendingChannel {
156 PendingChannel() {}
157 PendingChannel(std::unique_ptr<Channel> channel, 171 PendingChannel(std::unique_ptr<Channel> channel,
158 const ChannelCreatedCallback& connected_callback) 172 const ChannelCreatedCallback& connected_callback)
159 : channel(std::move(channel)), connected_callback(connected_callback) {} 173 : channel(std::move(channel)), connected_callback(connected_callback) {}
160 PendingChannel(PendingChannel&& other) 174 PendingChannel(PendingChannel&& other)
161 : channel(std::move(other.channel)), 175 : channel(std::move(other.channel)),
162 connected_callback(std::move(other.connected_callback)) {} 176 connected_callback(std::move(other.connected_callback)) {}
163 PendingChannel& operator=(PendingChannel&& other) { 177 PendingChannel& operator=(PendingChannel&& other) {
164 channel = std::move(other.channel); 178 channel = std::move(other.channel);
165 connected_callback = std::move(other.connected_callback); 179 connected_callback = std::move(other.connected_callback);
166 return *this; 180 return *this;
167 } 181 }
168 182
169 std::unique_ptr<Channel> channel; 183 std::unique_ptr<Channel> channel;
170 ChannelCreatedCallback connected_callback; 184 ChannelCreatedCallback connected_callback;
171 }; 185 };
172 186
173 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( 187 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter(
174 bool outgoing,
175 const ErrorCallback& error_callback) 188 const ErrorCallback& error_callback)
176 : outgoing_(outgoing), 189 : error_callback_(error_callback) {}
177 error_callback_(error_callback),
178 weak_factory_(this) {}
179 190
180 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { 191 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() {
181 DCHECK(pending_channels_.empty()); 192 DCHECK(pending_channels_.empty());
182 } 193 }
183 194
184 void WebrtcDataStreamAdapter::Initialize( 195 void WebrtcDataStreamAdapter::Initialize(
185 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) { 196 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) {
186 peer_connection_ = peer_connection; 197 peer_connection_ = peer_connection;
187 } 198 }
188 199
189 void WebrtcDataStreamAdapter::OnIncomingDataChannel( 200 void WebrtcDataStreamAdapter::WrapIncomingDataChannel(
190 webrtc::DataChannelInterface* data_channel) { 201 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
191 DCHECK(!outgoing_); 202 const ChannelCreatedCallback& callback) {
192 203 AddPendingChannel(data_channel, callback);
193 auto it = pending_channels_.find(data_channel->label());
194 if (it == pending_channels_.end()) {
195 LOG(ERROR) << "Received unexpected data channel " << data_channel->label();
196 return;
197 }
198 it->second.channel->Start(data_channel);
199 } 204 }
200 205
201 void WebrtcDataStreamAdapter::CreateChannel( 206 void WebrtcDataStreamAdapter::CreateChannel(
202 const std::string& name, 207 const std::string& name,
203 const ChannelCreatedCallback& callback) { 208 const ChannelCreatedCallback& callback) {
204 DCHECK(peer_connection_); 209 webrtc::DataChannelInit config;
205 DCHECK(pending_channels_.find(name) == pending_channels_.end()); 210 config.reliable = true;
206 211 AddPendingChannel(peer_connection_->CreateDataChannel(name, &config),
207 Channel* channel = new Channel(weak_factory_.GetWeakPtr()); 212 callback);
208 pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback);
209
210 if (outgoing_) {
211 webrtc::DataChannelInit config;
212 config.reliable = true;
213 channel->Start(peer_connection_->CreateDataChannel(name, &config));
214 }
215 } 213 }
216 214
217 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { 215 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) {
218 auto it = pending_channels_.find(name); 216 auto it = pending_channels_.find(name);
219 DCHECK(it != pending_channels_.end()); 217 DCHECK(it != pending_channels_.end());
220 pending_channels_.erase(it); 218 pending_channels_.erase(it);
221 } 219 }
222 220
221 void WebrtcDataStreamAdapter::AddPendingChannel(
222 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
223 const ChannelCreatedCallback& callback) {
224 DCHECK(peer_connection_);
225 DCHECK(pending_channels_.find(data_channel->label()) ==
226 pending_channels_.end());
227
228 Channel* channel = new Channel(this);
229 pending_channels_.insert(
230 std::make_pair(data_channel->label(),
231 PendingChannel(base::WrapUnique(channel), callback)));
232
233 channel->Start(data_channel);
234 }
235
223 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { 236 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) {
224 auto it = pending_channels_.find(channel->name()); 237 auto it = pending_channels_.find(channel->name());
225 DCHECK(it != pending_channels_.end()); 238 DCHECK(it != pending_channels_.end());
239
226 PendingChannel pending_channel = std::move(it->second); 240 PendingChannel pending_channel = std::move(it->second);
227 pending_channels_.erase(it); 241 pending_channels_.erase(it);
228 242
229 // Once the channel is connected its ownership is passed to the 243 // Once the channel is connected its ownership is passed to the
230 // |connected_callback|. 244 // |connected_callback|.
231 pending_channel.connected_callback.Run(std::move(pending_channel.channel)); 245 pending_channel.connected_callback.Run(std::move(pending_channel.channel));
232 } 246 }
233 247
234 void WebrtcDataStreamAdapter::OnChannelError() { 248 void WebrtcDataStreamAdapter::OnChannelError() {
249 pending_channels_.clear();
235 error_callback_.Run(CHANNEL_CONNECTION_ERROR); 250 error_callback_.Run(CHANNEL_CONNECTION_ERROR);
236 } 251 }
237 252
238 } // namespace protocol 253 } // namespace protocol
239 } // namespace remoting 254 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/webrtc_data_stream_adapter.h ('k') | remoting/protocol/webrtc_transport.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698