OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/protocol/webrtc_data_stream_adapter.h" | 5 #include "remoting/protocol/webrtc_data_stream_adapter.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
11 #include "base/callback_helpers.h" | 11 #include "base/callback_helpers.h" |
12 #include "base/location.h" | 12 #include "base/location.h" |
13 #include "base/macros.h" | 13 #include "base/macros.h" |
14 #include "base/memory/ptr_util.h" | 14 #include "base/memory/ptr_util.h" |
15 #include "base/threading/thread_task_runner_handle.h" | 15 #include "base/threading/thread_task_runner_handle.h" |
16 #include "net/base/net_errors.h" | 16 #include "net/base/net_errors.h" |
17 #include "remoting/base/compound_buffer.h" | 17 #include "remoting/base/compound_buffer.h" |
18 #include "remoting/protocol/message_pipe.h" | 18 #include "remoting/protocol/message_pipe.h" |
19 #include "remoting/protocol/message_serialization.h" | 19 #include "remoting/protocol/message_serialization.h" |
20 | 20 |
21 namespace remoting { | 21 namespace remoting { |
22 namespace protocol { | 22 namespace protocol { |
23 | 23 |
24 class WebrtcDataStreamAdapter::Channel : public MessagePipe, | 24 class WebrtcDataStreamAdapter::Channel : public MessagePipe, |
25 public webrtc::DataChannelObserver { | 25 public webrtc::DataChannelObserver { |
26 public: | 26 public: |
27 explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter); | 27 explicit Channel(WebrtcDataStreamAdapter* adapter); |
28 ~Channel() override; | 28 ~Channel() override; |
29 | 29 |
30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); | 30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); |
31 | 31 |
32 std::string name() { return channel_->label(); } | 32 std::string name() { return channel_->label(); } |
33 | 33 |
34 // MessagePipe interface. | 34 // MessagePipe interface. |
35 void StartReceiving(const MessageReceivedCallback& callback) override; | 35 void Start(EventHandler* event_handler) override; |
36 void Send(google::protobuf::MessageLite* message, | 36 void Send(google::protobuf::MessageLite* message, |
37 const base::Closure& done) override; | 37 const base::Closure& done) override; |
38 | 38 |
39 private: | 39 private: |
40 enum class State { CONNECTING, OPEN, CLOSED }; | 40 enum class State { CONNECTING, OPEN, CLOSED }; |
41 | 41 |
42 // webrtc::DataChannelObserver interface. | 42 // webrtc::DataChannelObserver interface. |
43 void OnStateChange() override; | 43 void OnStateChange() override; |
44 void OnMessage(const webrtc::DataBuffer& buffer) override; | 44 void OnMessage(const webrtc::DataBuffer& buffer) override; |
45 | 45 |
46 void OnConnected(); | 46 void OnConnected(); |
47 | 47 |
48 void OnError(); | 48 void OnClosed(); |
49 | 49 |
50 base::WeakPtr<WebrtcDataStreamAdapter> adapter_; | 50 // |adapter_| owns channels while they are being connected. |
| 51 WebrtcDataStreamAdapter* adapter_; |
51 | 52 |
52 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; | 53 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; |
53 | 54 |
54 MessageReceivedCallback message_received_callback_; | 55 EventHandler* event_handler_ = nullptr; |
55 | 56 |
56 State state_ = State::CONNECTING; | 57 State state_ = State::CONNECTING; |
57 | 58 |
58 DISALLOW_COPY_AND_ASSIGN(Channel); | 59 DISALLOW_COPY_AND_ASSIGN(Channel); |
59 }; | 60 }; |
60 | 61 |
61 WebrtcDataStreamAdapter::Channel::Channel( | 62 WebrtcDataStreamAdapter::Channel::Channel(WebrtcDataStreamAdapter* adapter) |
62 base::WeakPtr<WebrtcDataStreamAdapter> adapter) | |
63 : adapter_(adapter) {} | 63 : adapter_(adapter) {} |
64 | 64 |
65 WebrtcDataStreamAdapter::Channel::~Channel() { | 65 WebrtcDataStreamAdapter::Channel::~Channel() { |
66 if (channel_) { | 66 if (channel_) { |
67 channel_->UnregisterObserver(); | 67 channel_->UnregisterObserver(); |
68 channel_->Close(); | 68 channel_->Close(); |
69 } | 69 } |
70 } | 70 } |
71 | 71 |
72 void WebrtcDataStreamAdapter::Channel::Start( | 72 void WebrtcDataStreamAdapter::Channel::Start( |
73 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { | 73 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { |
74 DCHECK(!channel_); | 74 DCHECK(!channel_); |
75 | 75 |
76 channel_ = channel; | 76 channel_ = channel; |
77 channel_->RegisterObserver(this); | 77 channel_->RegisterObserver(this); |
78 | 78 |
79 if (channel_->state() == webrtc::DataChannelInterface::kOpen) { | 79 if (channel_->state() == webrtc::DataChannelInterface::kOpen) { |
80 OnConnected(); | 80 OnConnected(); |
81 } else { | 81 } else { |
82 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); | 82 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); |
83 } | 83 } |
84 } | 84 } |
85 | 85 |
86 void WebrtcDataStreamAdapter::Channel::StartReceiving( | 86 void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) { |
87 const MessageReceivedCallback& callback) { | 87 DCHECK(!event_handler_); |
88 DCHECK(message_received_callback_.is_null()); | 88 DCHECK(event_handler); |
89 DCHECK(!callback.is_null()); | |
90 | 89 |
91 message_received_callback_ = callback; | 90 event_handler_ = event_handler; |
92 } | 91 } |
93 | 92 |
94 void WebrtcDataStreamAdapter::Channel::Send( | 93 void WebrtcDataStreamAdapter::Channel::Send( |
95 google::protobuf::MessageLite* message, | 94 google::protobuf::MessageLite* message, |
96 const base::Closure& done) { | 95 const base::Closure& done) { |
| 96 DCHECK(state_ == State::OPEN); |
| 97 |
97 rtc::CopyOnWriteBuffer buffer; | 98 rtc::CopyOnWriteBuffer buffer; |
98 buffer.SetSize(message->ByteSize()); | 99 buffer.SetSize(message->ByteSize()); |
99 message->SerializeWithCachedSizesToArray( | 100 message->SerializeWithCachedSizesToArray( |
100 reinterpret_cast<uint8_t*>(buffer.data())); | 101 reinterpret_cast<uint8_t*>(buffer.data())); |
101 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); | 102 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); |
102 if (!channel_->Send(data_buffer)) { | 103 if (!channel_->Send(data_buffer)) { |
103 OnError(); | 104 LOG(ERROR) << "Send failed on data channel " << channel_->label(); |
| 105 channel_->Close(); |
104 return; | 106 return; |
105 } | 107 } |
106 | 108 |
107 if (!done.is_null()) | 109 if (!done.is_null()) |
108 done.Run(); | 110 done.Run(); |
109 } | 111 } |
110 | 112 |
111 void WebrtcDataStreamAdapter::Channel::OnStateChange() { | 113 void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
112 switch (channel_->state()) { | 114 switch (channel_->state()) { |
113 case webrtc::DataChannelInterface::kOpen: | 115 case webrtc::DataChannelInterface::kOpen: |
114 OnConnected(); | 116 OnConnected(); |
115 break; | 117 break; |
116 | 118 |
117 case webrtc::DataChannelInterface::kClosing: | 119 case webrtc::DataChannelInterface::kClosing: |
118 // Currently channels are not expected to be closed. | 120 OnClosed(); |
119 OnError(); | |
120 break; | 121 break; |
121 | 122 |
122 case webrtc::DataChannelInterface::kConnecting: | 123 case webrtc::DataChannelInterface::kConnecting: |
123 case webrtc::DataChannelInterface::kClosed: | 124 case webrtc::DataChannelInterface::kClosed: |
124 break; | 125 break; |
125 } | 126 } |
126 } | 127 } |
127 | 128 |
128 void WebrtcDataStreamAdapter::Channel::OnConnected() { | |
129 CHECK(state_ == State::CONNECTING); | |
130 state_ = State::OPEN; | |
131 adapter_->OnChannelConnected(this); | |
132 } | |
133 | |
134 void WebrtcDataStreamAdapter::Channel::OnError() { | |
135 if (state_ == State::CLOSED) | |
136 return; | |
137 | |
138 state_ = State::CLOSED; | |
139 | |
140 // Notify the adapter about the error asychronously. | |
141 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
142 FROM_HERE, | |
143 base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); | |
144 } | |
145 | |
146 void WebrtcDataStreamAdapter::Channel::OnMessage( | 129 void WebrtcDataStreamAdapter::Channel::OnMessage( |
147 const webrtc::DataBuffer& rtc_buffer) { | 130 const webrtc::DataBuffer& rtc_buffer) { |
| 131 if (state_ != State::OPEN) { |
| 132 LOG(ERROR) << "Dropping a message received when the channel is not open."; |
| 133 return; |
| 134 } |
| 135 |
148 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); | 136 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); |
149 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), | 137 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), |
150 rtc_buffer.data.size()); | 138 rtc_buffer.data.size()); |
151 buffer->Lock(); | 139 buffer->Lock(); |
152 message_received_callback_.Run(std::move(buffer)); | 140 event_handler_->OnMessageReceived(std::move(buffer)); |
| 141 } |
| 142 |
| 143 void WebrtcDataStreamAdapter::Channel::OnConnected() { |
| 144 DCHECK(state_ == State::CONNECTING); |
| 145 state_ = State::OPEN; |
| 146 WebrtcDataStreamAdapter* adapter = adapter_; |
| 147 adapter_ = nullptr; |
| 148 adapter->OnChannelConnected(this); |
| 149 } |
| 150 |
| 151 void WebrtcDataStreamAdapter::Channel::OnClosed() { |
| 152 switch (state_) { |
| 153 case State::CONNECTING: |
| 154 state_ = State::CLOSED; |
| 155 LOG(WARNING) << "Channel " << channel_->label() |
| 156 << " was closed before it's connected."; |
| 157 adapter_->OnChannelError(); |
| 158 return; |
| 159 |
| 160 case State::OPEN: |
| 161 state_ = State::CLOSED; |
| 162 event_handler_->OnMessagePipeClosed(); |
| 163 return; |
| 164 |
| 165 case State::CLOSED: |
| 166 break; |
| 167 } |
153 } | 168 } |
154 | 169 |
155 struct WebrtcDataStreamAdapter::PendingChannel { | 170 struct WebrtcDataStreamAdapter::PendingChannel { |
156 PendingChannel() {} | |
157 PendingChannel(std::unique_ptr<Channel> channel, | 171 PendingChannel(std::unique_ptr<Channel> channel, |
158 const ChannelCreatedCallback& connected_callback) | 172 const ChannelCreatedCallback& connected_callback) |
159 : channel(std::move(channel)), connected_callback(connected_callback) {} | 173 : channel(std::move(channel)), connected_callback(connected_callback) {} |
160 PendingChannel(PendingChannel&& other) | 174 PendingChannel(PendingChannel&& other) |
161 : channel(std::move(other.channel)), | 175 : channel(std::move(other.channel)), |
162 connected_callback(std::move(other.connected_callback)) {} | 176 connected_callback(std::move(other.connected_callback)) {} |
163 PendingChannel& operator=(PendingChannel&& other) { | 177 PendingChannel& operator=(PendingChannel&& other) { |
164 channel = std::move(other.channel); | 178 channel = std::move(other.channel); |
165 connected_callback = std::move(other.connected_callback); | 179 connected_callback = std::move(other.connected_callback); |
166 return *this; | 180 return *this; |
167 } | 181 } |
168 | 182 |
169 std::unique_ptr<Channel> channel; | 183 std::unique_ptr<Channel> channel; |
170 ChannelCreatedCallback connected_callback; | 184 ChannelCreatedCallback connected_callback; |
171 }; | 185 }; |
172 | 186 |
173 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( | 187 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( |
174 bool outgoing, | |
175 const ErrorCallback& error_callback) | 188 const ErrorCallback& error_callback) |
176 : outgoing_(outgoing), | 189 : error_callback_(error_callback) {} |
177 error_callback_(error_callback), | |
178 weak_factory_(this) {} | |
179 | 190 |
180 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { | 191 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { |
181 DCHECK(pending_channels_.empty()); | 192 DCHECK(pending_channels_.empty()); |
182 } | 193 } |
183 | 194 |
184 void WebrtcDataStreamAdapter::Initialize( | 195 void WebrtcDataStreamAdapter::Initialize( |
185 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) { | 196 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) { |
186 peer_connection_ = peer_connection; | 197 peer_connection_ = peer_connection; |
187 } | 198 } |
188 | 199 |
189 void WebrtcDataStreamAdapter::OnIncomingDataChannel( | 200 void WebrtcDataStreamAdapter::WrapIncomingDataChannel( |
190 webrtc::DataChannelInterface* data_channel) { | 201 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
191 DCHECK(!outgoing_); | 202 const ChannelCreatedCallback& callback) { |
192 | 203 AddPendingChannel(data_channel, callback); |
193 auto it = pending_channels_.find(data_channel->label()); | |
194 if (it == pending_channels_.end()) { | |
195 LOG(ERROR) << "Received unexpected data channel " << data_channel->label(); | |
196 return; | |
197 } | |
198 it->second.channel->Start(data_channel); | |
199 } | 204 } |
200 | 205 |
201 void WebrtcDataStreamAdapter::CreateChannel( | 206 void WebrtcDataStreamAdapter::CreateChannel( |
202 const std::string& name, | 207 const std::string& name, |
203 const ChannelCreatedCallback& callback) { | 208 const ChannelCreatedCallback& callback) { |
204 DCHECK(peer_connection_); | 209 webrtc::DataChannelInit config; |
205 DCHECK(pending_channels_.find(name) == pending_channels_.end()); | 210 config.reliable = true; |
206 | 211 AddPendingChannel(peer_connection_->CreateDataChannel(name, &config), |
207 Channel* channel = new Channel(weak_factory_.GetWeakPtr()); | 212 callback); |
208 pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback); | |
209 | |
210 if (outgoing_) { | |
211 webrtc::DataChannelInit config; | |
212 config.reliable = true; | |
213 channel->Start(peer_connection_->CreateDataChannel(name, &config)); | |
214 } | |
215 } | 213 } |
216 | 214 |
217 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { | 215 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
218 auto it = pending_channels_.find(name); | 216 auto it = pending_channels_.find(name); |
219 DCHECK(it != pending_channels_.end()); | 217 DCHECK(it != pending_channels_.end()); |
220 pending_channels_.erase(it); | 218 pending_channels_.erase(it); |
221 } | 219 } |
222 | 220 |
| 221 void WebrtcDataStreamAdapter::AddPendingChannel( |
| 222 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
| 223 const ChannelCreatedCallback& callback) { |
| 224 DCHECK(peer_connection_); |
| 225 DCHECK(pending_channels_.find(data_channel->label()) == |
| 226 pending_channels_.end()); |
| 227 |
| 228 Channel* channel = new Channel(this); |
| 229 pending_channels_.insert( |
| 230 std::make_pair(data_channel->label(), |
| 231 PendingChannel(base::WrapUnique(channel), callback))); |
| 232 |
| 233 channel->Start(data_channel); |
| 234 } |
| 235 |
223 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { | 236 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { |
224 auto it = pending_channels_.find(channel->name()); | 237 auto it = pending_channels_.find(channel->name()); |
225 DCHECK(it != pending_channels_.end()); | 238 DCHECK(it != pending_channels_.end()); |
| 239 |
226 PendingChannel pending_channel = std::move(it->second); | 240 PendingChannel pending_channel = std::move(it->second); |
227 pending_channels_.erase(it); | 241 pending_channels_.erase(it); |
228 | 242 |
229 // Once the channel is connected its ownership is passed to the | 243 // Once the channel is connected its ownership is passed to the |
230 // |connected_callback|. | 244 // |connected_callback|. |
231 pending_channel.connected_callback.Run(std::move(pending_channel.channel)); | 245 pending_channel.connected_callback.Run(std::move(pending_channel.channel)); |
232 } | 246 } |
233 | 247 |
234 void WebrtcDataStreamAdapter::OnChannelError() { | 248 void WebrtcDataStreamAdapter::OnChannelError() { |
| 249 pending_channels_.clear(); |
235 error_callback_.Run(CHANNEL_CONNECTION_ERROR); | 250 error_callback_.Run(CHANNEL_CONNECTION_ERROR); |
236 } | 251 } |
237 | 252 |
238 } // namespace protocol | 253 } // namespace protocol |
239 } // namespace remoting | 254 } // namespace remoting |
OLD | NEW |