Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(431)

Side by Side Diff: remoting/protocol/webrtc_data_stream_adapter.cc

Issue 2164163002: Simplify data channel creation logic in WebRTC-based protocol (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: nits Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « remoting/protocol/webrtc_data_stream_adapter.h ('k') | remoting/protocol/webrtc_transport.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/webrtc_data_stream_adapter.h" 5 #include "remoting/protocol/webrtc_data_stream_adapter.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/callback_helpers.h" 11 #include "base/callback_helpers.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
15 #include "base/threading/thread_task_runner_handle.h" 15 #include "base/threading/thread_task_runner_handle.h"
16 #include "net/base/net_errors.h" 16 #include "net/base/net_errors.h"
17 #include "remoting/base/compound_buffer.h" 17 #include "remoting/base/compound_buffer.h"
18 #include "remoting/protocol/message_pipe.h" 18 #include "remoting/protocol/message_pipe.h"
19 #include "remoting/protocol/message_serialization.h" 19 #include "remoting/protocol/message_serialization.h"
20 20
21 namespace remoting { 21 namespace remoting {
22 namespace protocol { 22 namespace protocol {
23 23
24 class WebrtcDataStreamAdapter::Channel : public MessagePipe, 24 namespace {
25 public webrtc::DataChannelObserver { 25
26 class WebrtcDataChannel : public MessagePipe,
27 public webrtc::DataChannelObserver {
26 public: 28 public:
27 explicit Channel(WebrtcDataStreamAdapter* adapter); 29 explicit WebrtcDataChannel(
28 ~Channel() override; 30 rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
29 31 ~WebrtcDataChannel() override;
30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
31 32
32 std::string name() { return channel_->label(); } 33 std::string name() { return channel_->label(); }
33 34
34 // MessagePipe interface. 35 // MessagePipe interface.
35 void Start(EventHandler* event_handler) override; 36 void Start(EventHandler* event_handler) override;
36 void Send(google::protobuf::MessageLite* message, 37 void Send(google::protobuf::MessageLite* message,
37 const base::Closure& done) override; 38 const base::Closure& done) override;
38 39
39 private: 40 private:
40 enum class State { CONNECTING, OPEN, CLOSED }; 41 enum class State { CONNECTING, OPEN, CLOSED };
41 42
42 // webrtc::DataChannelObserver interface. 43 // webrtc::DataChannelObserver interface.
43 void OnStateChange() override; 44 void OnStateChange() override;
44 void OnMessage(const webrtc::DataBuffer& buffer) override; 45 void OnMessage(const webrtc::DataBuffer& buffer) override;
45 46
46 void OnConnected(); 47 void OnConnected();
47 48
48 void OnClosed(); 49 void OnClosed();
49 50
50 // |adapter_| owns channels while they are being connected.
51 WebrtcDataStreamAdapter* adapter_;
52
53 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; 51 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_;
54 52
55 EventHandler* event_handler_ = nullptr; 53 EventHandler* event_handler_ = nullptr;
56 54
57 State state_ = State::CONNECTING; 55 State state_ = State::CONNECTING;
58 56
59 DISALLOW_COPY_AND_ASSIGN(Channel); 57 DISALLOW_COPY_AND_ASSIGN(WebrtcDataChannel);
60 }; 58 };
61 59
62 WebrtcDataStreamAdapter::Channel::Channel(WebrtcDataStreamAdapter* adapter) 60 WebrtcDataChannel::WebrtcDataChannel(
63 : adapter_(adapter) {} 61 rtc::scoped_refptr<webrtc::DataChannelInterface> channel)
62 : channel_(channel) {
63 channel_->RegisterObserver(this);
64 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting);
65 }
64 66
65 WebrtcDataStreamAdapter::Channel::~Channel() { 67 WebrtcDataChannel::~WebrtcDataChannel() {
66 if (channel_) { 68 if (channel_) {
67 channel_->UnregisterObserver(); 69 channel_->UnregisterObserver();
68 channel_->Close(); 70 channel_->Close();
69 } 71 }
70 } 72 }
71 73
72 void WebrtcDataStreamAdapter::Channel::Start( 74 void WebrtcDataChannel::Start(EventHandler* event_handler) {
73 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
74 DCHECK(!channel_);
75
76 channel_ = channel;
77 channel_->RegisterObserver(this);
78
79 if (channel_->state() == webrtc::DataChannelInterface::kOpen) {
80 OnConnected();
81 } else {
82 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting);
83 }
84 }
85
86 void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) {
87 DCHECK(!event_handler_); 75 DCHECK(!event_handler_);
88 DCHECK(event_handler); 76 DCHECK(event_handler);
89 77
90 event_handler_ = event_handler; 78 event_handler_ = event_handler;
91 } 79 }
92 80
93 void WebrtcDataStreamAdapter::Channel::Send( 81 void WebrtcDataChannel::Send(google::protobuf::MessageLite* message,
94 google::protobuf::MessageLite* message, 82 const base::Closure& done) {
95 const base::Closure& done) {
96 DCHECK(state_ == State::OPEN); 83 DCHECK(state_ == State::OPEN);
97 84
98 rtc::CopyOnWriteBuffer buffer; 85 rtc::CopyOnWriteBuffer buffer;
99 buffer.SetSize(message->ByteSize()); 86 buffer.SetSize(message->ByteSize());
100 message->SerializeWithCachedSizesToArray( 87 message->SerializeWithCachedSizesToArray(
101 reinterpret_cast<uint8_t*>(buffer.data())); 88 reinterpret_cast<uint8_t*>(buffer.data()));
102 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); 89 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */);
103 if (!channel_->Send(data_buffer)) { 90 if (!channel_->Send(data_buffer)) {
104 LOG(ERROR) << "Send failed on data channel " << channel_->label(); 91 LOG(ERROR) << "Send failed on data channel " << channel_->label();
105 channel_->Close(); 92 channel_->Close();
106 return; 93 return;
107 } 94 }
108 95
109 if (!done.is_null()) 96 if (!done.is_null())
110 done.Run(); 97 done.Run();
111 } 98 }
112 99
113 void WebrtcDataStreamAdapter::Channel::OnStateChange() { 100 void WebrtcDataChannel::OnStateChange() {
114 switch (channel_->state()) { 101 switch (channel_->state()) {
115 case webrtc::DataChannelInterface::kOpen: 102 case webrtc::DataChannelInterface::kOpen:
116 OnConnected(); 103 DCHECK(state_ == State::CONNECTING);
104 state_ = State::OPEN;
105 event_handler_->OnMessagePipeOpen();
117 break; 106 break;
118 107
119 case webrtc::DataChannelInterface::kClosing: 108 case webrtc::DataChannelInterface::kClosing:
120 OnClosed(); 109 if (state_ != State::CLOSED) {
110 state_ = State::CLOSED;
111 event_handler_->OnMessagePipeClosed();
112 }
121 break; 113 break;
122 114
123 case webrtc::DataChannelInterface::kConnecting: 115 case webrtc::DataChannelInterface::kConnecting:
124 case webrtc::DataChannelInterface::kClosed: 116 case webrtc::DataChannelInterface::kClosed:
125 break; 117 break;
126 } 118 }
127 } 119 }
128 120
129 void WebrtcDataStreamAdapter::Channel::OnMessage( 121 void WebrtcDataChannel::OnMessage(const webrtc::DataBuffer& rtc_buffer) {
130 const webrtc::DataBuffer& rtc_buffer) {
131 if (state_ != State::OPEN) { 122 if (state_ != State::OPEN) {
132 LOG(ERROR) << "Dropping a message received when the channel is not open."; 123 LOG(ERROR) << "Dropping a message received when the channel is not open.";
133 return; 124 return;
134 } 125 }
135 126
136 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); 127 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer());
137 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), 128 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()),
138 rtc_buffer.data.size()); 129 rtc_buffer.data.size());
139 buffer->Lock(); 130 buffer->Lock();
140 event_handler_->OnMessageReceived(std::move(buffer)); 131 event_handler_->OnMessageReceived(std::move(buffer));
141 } 132 }
142 133
143 void WebrtcDataStreamAdapter::Channel::OnConnected() { 134 } // namespace
144 DCHECK(state_ == State::CONNECTING); 135
145 state_ = State::OPEN; 136 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter(
146 WebrtcDataStreamAdapter* adapter = adapter_; 137 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection)
147 adapter_ = nullptr; 138 : peer_connection_(peer_connection) {}
148 adapter->OnChannelConnected(this); 139 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() {}
140
141 std::unique_ptr<MessagePipe> WebrtcDataStreamAdapter::CreateOutgoingChannel(
142 const std::string& name) {
143 webrtc::DataChannelInit config;
144 config.reliable = true;
145 return base::WrapUnique(new WebrtcDataChannel(
146 peer_connection_->CreateDataChannel(name, &config)));
149 } 147 }
150 148
151 void WebrtcDataStreamAdapter::Channel::OnClosed() { 149 std::unique_ptr<MessagePipe> WebrtcDataStreamAdapter::WrapIncomingDataChannel(
152 switch (state_) { 150 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel) {
153 case State::CONNECTING: 151 return base::WrapUnique(new WebrtcDataChannel(data_channel));
154 state_ = State::CLOSED;
155 LOG(WARNING) << "Channel " << channel_->label()
156 << " was closed before it's connected.";
157 adapter_->OnChannelError();
158 return;
159
160 case State::OPEN:
161 state_ = State::CLOSED;
162 event_handler_->OnMessagePipeClosed();
163 return;
164
165 case State::CLOSED:
166 break;
167 }
168 }
169
170 struct WebrtcDataStreamAdapter::PendingChannel {
171 PendingChannel(std::unique_ptr<Channel> channel,
172 const ChannelCreatedCallback& connected_callback)
173 : channel(std::move(channel)), connected_callback(connected_callback) {}
174 PendingChannel(PendingChannel&& other)
175 : channel(std::move(other.channel)),
176 connected_callback(std::move(other.connected_callback)) {}
177 PendingChannel& operator=(PendingChannel&& other) {
178 channel = std::move(other.channel);
179 connected_callback = std::move(other.connected_callback);
180 return *this;
181 }
182
183 std::unique_ptr<Channel> channel;
184 ChannelCreatedCallback connected_callback;
185 };
186
187 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter(
188 const ErrorCallback& error_callback)
189 : error_callback_(error_callback) {}
190
191 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() {
192 DCHECK(pending_channels_.empty());
193 }
194
195 void WebrtcDataStreamAdapter::Initialize(
196 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) {
197 peer_connection_ = peer_connection;
198 }
199
200 void WebrtcDataStreamAdapter::WrapIncomingDataChannel(
201 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
202 const ChannelCreatedCallback& callback) {
203 AddPendingChannel(data_channel, callback);
204 }
205
206 void WebrtcDataStreamAdapter::CreateChannel(
207 const std::string& name,
208 const ChannelCreatedCallback& callback) {
209 webrtc::DataChannelInit config;
210 config.reliable = true;
211 AddPendingChannel(peer_connection_->CreateDataChannel(name, &config),
212 callback);
213 }
214
215 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) {
216 auto it = pending_channels_.find(name);
217 DCHECK(it != pending_channels_.end());
218 pending_channels_.erase(it);
219 }
220
221 void WebrtcDataStreamAdapter::AddPendingChannel(
222 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel,
223 const ChannelCreatedCallback& callback) {
224 DCHECK(peer_connection_);
225 DCHECK(pending_channels_.find(data_channel->label()) ==
226 pending_channels_.end());
227
228 Channel* channel = new Channel(this);
229 pending_channels_.insert(
230 std::make_pair(data_channel->label(),
231 PendingChannel(base::WrapUnique(channel), callback)));
232
233 channel->Start(data_channel);
234 }
235
236 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) {
237 auto it = pending_channels_.find(channel->name());
238 DCHECK(it != pending_channels_.end());
239
240 PendingChannel pending_channel = std::move(it->second);
241 pending_channels_.erase(it);
242
243 // Once the channel is connected its ownership is passed to the
244 // |connected_callback|.
245 pending_channel.connected_callback.Run(std::move(pending_channel.channel));
246 }
247
248 void WebrtcDataStreamAdapter::OnChannelError() {
249 pending_channels_.clear();
250 error_callback_.Run(CHANNEL_CONNECTION_ERROR);
251 } 152 }
252 153
253 } // namespace protocol 154 } // namespace protocol
254 } // namespace remoting 155 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/webrtc_data_stream_adapter.h ('k') | remoting/protocol/webrtc_transport.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698