Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Side by Side Diff: remoting/protocol/webrtc_data_stream_adapter.cc

Issue 2146213002: Add support for dynamic channels in WebrtcTransport. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/webrtc_data_stream_adapter.h" 5 #include "remoting/protocol/webrtc_data_stream_adapter.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
(...skipping 14 matching lines...) Expand all
25 public webrtc::DataChannelObserver { 25 public webrtc::DataChannelObserver {
26 public: 26 public:
27 explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter); 27 explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter);
28 ~Channel() override; 28 ~Channel() override;
29 29
30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); 30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
31 31
32 std::string name() { return channel_->label(); } 32 std::string name() { return channel_->label(); }
33 33
34 // MessagePipe interface. 34 // MessagePipe interface.
35 void StartReceiving(const MessageReceivedCallback& callback) override; 35 void Start(EventHandler* event_handler) override;
36 void Send(google::protobuf::MessageLite* message, 36 void Send(google::protobuf::MessageLite* message,
37 const base::Closure& done) override; 37 const base::Closure& done) override;
38 38
39 private: 39 private:
40 enum class State { CONNECTING, OPEN, CLOSED }; 40 enum class State { CONNECTING, OPEN, CLOSED };
41 41
42 // webrtc::DataChannelObserver interface. 42 // webrtc::DataChannelObserver interface.
43 void OnStateChange() override; 43 void OnStateChange() override;
44 void OnMessage(const webrtc::DataBuffer& buffer) override; 44 void OnMessage(const webrtc::DataBuffer& buffer) override;
45 45
46 void OnConnected(); 46 void OnConnected();
47 47
48 void OnError(); 48 void OnClosed();
49 void NotifyClosed();
49 50
50 base::WeakPtr<WebrtcDataStreamAdapter> adapter_; 51 base::WeakPtr<WebrtcDataStreamAdapter> adapter_;
51 52
52 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; 53 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_;
53 54
54 MessageReceivedCallback message_received_callback_; 55 EventHandler* event_handler_ = nullptr;
55 56
56 State state_ = State::CONNECTING; 57 State state_ = State::CONNECTING;
57 58
59 base::WeakPtrFactory<Channel> weak_factory_;
60
58 DISALLOW_COPY_AND_ASSIGN(Channel); 61 DISALLOW_COPY_AND_ASSIGN(Channel);
59 }; 62 };
60 63
61 WebrtcDataStreamAdapter::Channel::Channel( 64 WebrtcDataStreamAdapter::Channel::Channel(
62 base::WeakPtr<WebrtcDataStreamAdapter> adapter) 65 base::WeakPtr<WebrtcDataStreamAdapter> adapter)
63 : adapter_(adapter) {} 66 : adapter_(adapter), weak_factory_(this) {}
64 67
65 WebrtcDataStreamAdapter::Channel::~Channel() { 68 WebrtcDataStreamAdapter::Channel::~Channel() {
66 if (channel_) { 69 if (channel_) {
67 channel_->UnregisterObserver(); 70 channel_->UnregisterObserver();
68 channel_->Close(); 71 channel_->Close();
69 } 72 }
70 } 73 }
71 74
72 void WebrtcDataStreamAdapter::Channel::Start( 75 void WebrtcDataStreamAdapter::Channel::Start(
73 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { 76 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
74 DCHECK(!channel_); 77 DCHECK(!channel_);
75 78
76 channel_ = channel; 79 channel_ = channel;
77 channel_->RegisterObserver(this); 80 channel_->RegisterObserver(this);
78 81
79 if (channel_->state() == webrtc::DataChannelInterface::kOpen) { 82 if (channel_->state() == webrtc::DataChannelInterface::kOpen) {
80 OnConnected(); 83 OnConnected();
81 } else { 84 } else {
82 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); 85 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting);
83 } 86 }
84 } 87 }
85 88
86 void WebrtcDataStreamAdapter::Channel::StartReceiving( 89 void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) {
87 const MessageReceivedCallback& callback) { 90 DCHECK(!event_handler_);
88 DCHECK(message_received_callback_.is_null()); 91 DCHECK(event_handler);
89 DCHECK(!callback.is_null());
90 92
91 message_received_callback_ = callback; 93 event_handler_ = event_handler;
92 } 94 }
93 95
94 void WebrtcDataStreamAdapter::Channel::Send( 96 void WebrtcDataStreamAdapter::Channel::Send(
95 google::protobuf::MessageLite* message, 97 google::protobuf::MessageLite* message,
96 const base::Closure& done) { 98 const base::Closure& done) {
99 if(state_ != State::OPEN)
100 return;
Jamie 2016/07/19 18:24:47 Should we log an error here? Silently dropping the
Sergey Ulanov 2016/07/19 23:38:44 Replaced this with a DCHECK(). Also changed OnMess
101
97 rtc::CopyOnWriteBuffer buffer; 102 rtc::CopyOnWriteBuffer buffer;
98 buffer.SetSize(message->ByteSize()); 103 buffer.SetSize(message->ByteSize());
99 message->SerializeWithCachedSizesToArray( 104 message->SerializeWithCachedSizesToArray(
100 reinterpret_cast<uint8_t*>(buffer.data())); 105 reinterpret_cast<uint8_t*>(buffer.data()));
101 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); 106 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */);
102 if (!channel_->Send(data_buffer)) { 107 if (!channel_->Send(data_buffer)) {
103 OnError(); 108 LOG(ERROR) << "Send failed on data channel " << channel_->label();
109 channel_->Close();
104 return; 110 return;
105 } 111 }
106 112
107 if (!done.is_null()) 113 if (!done.is_null())
108 done.Run(); 114 done.Run();
109 } 115 }
110 116
111 void WebrtcDataStreamAdapter::Channel::OnStateChange() { 117 void WebrtcDataStreamAdapter::Channel::OnStateChange() {
112 switch (channel_->state()) { 118 switch (channel_->state()) {
113 case webrtc::DataChannelInterface::kOpen: 119 case webrtc::DataChannelInterface::kOpen:
114 OnConnected(); 120 OnConnected();
115 break; 121 break;
116 122
117 case webrtc::DataChannelInterface::kClosing: 123 case webrtc::DataChannelInterface::kClosing:
118 // Currently channels are not expected to be closed. 124 // Currently channels are not expected to be closed.
Jamie 2016/07/19 18:24:47 Log an error in this case?
Sergey Ulanov 2016/07/19 23:38:43 Removed the comment. This class now does support c
119 OnError(); 125 OnClosed();
120 break; 126 break;
121 127
122 case webrtc::DataChannelInterface::kConnecting: 128 case webrtc::DataChannelInterface::kConnecting:
123 case webrtc::DataChannelInterface::kClosed: 129 case webrtc::DataChannelInterface::kClosed:
124 break; 130 break;
125 } 131 }
126 } 132 }
127 133
128 void WebrtcDataStreamAdapter::Channel::OnConnected() {
129 CHECK(state_ == State::CONNECTING);
130 state_ = State::OPEN;
131 adapter_->OnChannelConnected(this);
132 }
133
134 void WebrtcDataStreamAdapter::Channel::OnError() {
135 if (state_ == State::CLOSED)
136 return;
137
138 state_ = State::CLOSED;
139
140 // Notify the adapter about the error asychronously.
141 base::ThreadTaskRunnerHandle::Get()->PostTask(
142 FROM_HERE,
143 base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_));
144 }
145
146 void WebrtcDataStreamAdapter::Channel::OnMessage( 134 void WebrtcDataStreamAdapter::Channel::OnMessage(
147 const webrtc::DataBuffer& rtc_buffer) { 135 const webrtc::DataBuffer& rtc_buffer) {
148 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); 136 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer());
149 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), 137 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()),
150 rtc_buffer.data.size()); 138 rtc_buffer.data.size());
151 buffer->Lock(); 139 buffer->Lock();
152 message_received_callback_.Run(std::move(buffer)); 140 event_handler_->OnMessageReceived(std::move(buffer));
141 }
142
143 void WebrtcDataStreamAdapter::Channel::OnConnected() {
144 DCHECK(state_ == State::CONNECTING);
145 state_ = State::OPEN;
146 adapter_->OnChannelConnected(this);
147 }
148
149 void WebrtcDataStreamAdapter::Channel::OnClosed() {
150 switch (state_) {
151 case State::CONNECTING:
Jamie 2016/07/19 18:24:47 You're not setting the state to CLOSED in this bra
Sergey Ulanov 2016/07/19 23:38:44 No. Fixed now.
152 LOG(WARNING) << "Channel " << channel_->label()
153 << " was closed before it's connected.";
154 // Notify the adapter about the error asynchronously.
155 base::ThreadTaskRunnerHandle::Get()->PostTask(
156 FROM_HERE,
157 base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_));
158 return;
159
160 case State::OPEN:
161 state_ = State::CLOSED;
162 base::ThreadTaskRunnerHandle::Get()->PostTask(
163 FROM_HERE,
164 base::Bind(&Channel::NotifyClosed, weak_factory_.GetWeakPtr()));
165 return;
166
167 case State::CLOSED:
168 break;
169 }
170 }
171
172 void WebrtcDataStreamAdapter::Channel::NotifyClosed() {
173 DCHECK(state_ == State::CLOSED);
174 event_handler_->OnMessagePipeClosed();
153 } 175 }
154 176
155 struct WebrtcDataStreamAdapter::PendingChannel { 177 struct WebrtcDataStreamAdapter::PendingChannel {
156 PendingChannel() {}
157 PendingChannel(std::unique_ptr<Channel> channel, 178 PendingChannel(std::unique_ptr<Channel> channel,
158 const ChannelCreatedCallback& connected_callback) 179 const ChannelCreatedCallback& connected_callback)
159 : channel(std::move(channel)), connected_callback(connected_callback) {} 180 : channel(std::move(channel)), connected_callback(connected_callback) {}
160 PendingChannel(PendingChannel&& other) 181 PendingChannel(PendingChannel&& other)
161 : channel(std::move(other.channel)), 182 : channel(std::move(other.channel)),
162 connected_callback(std::move(other.connected_callback)) {} 183 connected_callback(std::move(other.connected_callback)) {}
163 PendingChannel& operator=(PendingChannel&& other) { 184 PendingChannel& operator=(PendingChannel&& other) {
164 channel = std::move(other.channel); 185 channel = std::move(other.channel);
165 connected_callback = std::move(other.connected_callback); 186 connected_callback = std::move(other.connected_callback);
166 return *this; 187 return *this;
167 } 188 }
168 189
169 std::unique_ptr<Channel> channel; 190 std::unique_ptr<Channel> channel;
170 ChannelCreatedCallback connected_callback; 191 ChannelCreatedCallback connected_callback;
171 }; 192 };
172 193
173 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( 194 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter(
174 bool outgoing,
175 const ErrorCallback& error_callback) 195 const ErrorCallback& error_callback)
176 : outgoing_(outgoing), 196 : error_callback_(error_callback), weak_factory_(this) {}
177 error_callback_(error_callback),
178 weak_factory_(this) {}
179 197
180 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { 198 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() {
181 DCHECK(pending_channels_.empty()); 199 DCHECK(pending_channels_.empty());
182 } 200 }
183 201
184 void WebrtcDataStreamAdapter::Initialize( 202 void WebrtcDataStreamAdapter::Initialize(
185 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) { 203 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) {
186 peer_connection_ = peer_connection; 204 peer_connection_ = peer_connection;
187 } 205 }
188 206
189 void WebrtcDataStreamAdapter::OnIncomingDataChannel( 207 void WebrtcDataStreamAdapter::WrapIncomingDataChannel(
190 webrtc::DataChannelInterface* data_channel) { 208 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
191 DCHECK(!outgoing_); 209 const ChannelCreatedCallback& callback) {
192 210 AddPendingChannel(data_channel, callback);
193 auto it = pending_channels_.find(data_channel->label());
194 if (it == pending_channels_.end()) {
195 LOG(ERROR) << "Received unexpected data channel " << data_channel->label();
196 return;
197 }
198 it->second.channel->Start(data_channel);
199 } 211 }
200 212
201 void WebrtcDataStreamAdapter::CreateChannel( 213 void WebrtcDataStreamAdapter::CreateChannel(
202 const std::string& name, 214 const std::string& name,
203 const ChannelCreatedCallback& callback) { 215 const ChannelCreatedCallback& callback) {
204 DCHECK(peer_connection_); 216 webrtc::DataChannelInit config;
205 DCHECK(pending_channels_.find(name) == pending_channels_.end()); 217 config.reliable = true;
206 218 AddPendingChannel(peer_connection_->CreateDataChannel(name, &config),
207 Channel* channel = new Channel(weak_factory_.GetWeakPtr()); 219 callback);
208 pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback);
209
210 if (outgoing_) {
211 webrtc::DataChannelInit config;
212 config.reliable = true;
213 channel->Start(peer_connection_->CreateDataChannel(name, &config));
214 }
215 } 220 }
216 221
217 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { 222 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) {
218 auto it = pending_channels_.find(name); 223 auto it = pending_channels_.find(name);
219 DCHECK(it != pending_channels_.end()); 224 DCHECK(it != pending_channels_.end());
220 pending_channels_.erase(it); 225 pending_channels_.erase(it);
221 } 226 }
222 227
228 void WebrtcDataStreamAdapter::AddPendingChannel(
229 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
230 const ChannelCreatedCallback& callback) {
231 DCHECK(peer_connection_);
232 DCHECK(pending_channels_.find(data_channel->label()) ==
233 pending_channels_.end());
234
235 Channel* channel = new Channel(weak_factory_.GetWeakPtr());
236 pending_channels_.insert(
237 std::make_pair(data_channel->label(),
238 PendingChannel(base::WrapUnique(channel), callback)));
239
240 channel->Start(data_channel);
241 }
242
223 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { 243 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) {
224 auto it = pending_channels_.find(channel->name()); 244 auto it = pending_channels_.find(channel->name());
225 DCHECK(it != pending_channels_.end()); 245 DCHECK(it != pending_channels_.end());
246
226 PendingChannel pending_channel = std::move(it->second); 247 PendingChannel pending_channel = std::move(it->second);
227 pending_channels_.erase(it); 248 pending_channels_.erase(it);
228 249
229 // Once the channel is connected its ownership is passed to the 250 // Once the channel is connected its ownership is passed to the
230 // |connected_callback|. 251 // |connected_callback|.
231 pending_channel.connected_callback.Run(std::move(pending_channel.channel)); 252 pending_channel.connected_callback.Run(std::move(pending_channel.channel));
232 } 253 }
233 254
234 void WebrtcDataStreamAdapter::OnChannelError() { 255 void WebrtcDataStreamAdapter::OnChannelError() {
235 error_callback_.Run(CHANNEL_CONNECTION_ERROR); 256 error_callback_.Run(CHANNEL_CONNECTION_ERROR);
236 } 257 }
237 258
238 } // namespace protocol 259 } // namespace protocol
239 } // namespace remoting 260 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698