OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/protocol/webrtc_data_stream_adapter.h" | 5 #include "remoting/protocol/webrtc_data_stream_adapter.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
(...skipping 14 matching lines...) Expand all Loading... | |
25 public webrtc::DataChannelObserver { | 25 public webrtc::DataChannelObserver { |
26 public: | 26 public: |
27 explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter); | 27 explicit Channel(base::WeakPtr<WebrtcDataStreamAdapter> adapter); |
28 ~Channel() override; | 28 ~Channel() override; |
29 | 29 |
30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); | 30 void Start(rtc::scoped_refptr<webrtc::DataChannelInterface> channel); |
31 | 31 |
32 std::string name() { return channel_->label(); } | 32 std::string name() { return channel_->label(); } |
33 | 33 |
34 // MessagePipe interface. | 34 // MessagePipe interface. |
35 void StartReceiving(const MessageReceivedCallback& callback) override; | 35 void Start(EventHandler* event_handler) override; |
36 void Send(google::protobuf::MessageLite* message, | 36 void Send(google::protobuf::MessageLite* message, |
37 const base::Closure& done) override; | 37 const base::Closure& done) override; |
38 | 38 |
39 private: | 39 private: |
40 enum class State { CONNECTING, OPEN, CLOSED }; | 40 enum class State { CONNECTING, OPEN, CLOSED }; |
41 | 41 |
42 // webrtc::DataChannelObserver interface. | 42 // webrtc::DataChannelObserver interface. |
43 void OnStateChange() override; | 43 void OnStateChange() override; |
44 void OnMessage(const webrtc::DataBuffer& buffer) override; | 44 void OnMessage(const webrtc::DataBuffer& buffer) override; |
45 | 45 |
46 void OnConnected(); | 46 void OnConnected(); |
47 | 47 |
48 void OnError(); | 48 void OnClosed(); |
49 void NotifyClosed(); | |
49 | 50 |
50 base::WeakPtr<WebrtcDataStreamAdapter> adapter_; | 51 base::WeakPtr<WebrtcDataStreamAdapter> adapter_; |
51 | 52 |
52 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; | 53 rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; |
53 | 54 |
54 MessageReceivedCallback message_received_callback_; | 55 EventHandler* event_handler_ = nullptr; |
55 | 56 |
56 State state_ = State::CONNECTING; | 57 State state_ = State::CONNECTING; |
57 | 58 |
59 base::WeakPtrFactory<Channel> weak_factory_; | |
60 | |
58 DISALLOW_COPY_AND_ASSIGN(Channel); | 61 DISALLOW_COPY_AND_ASSIGN(Channel); |
59 }; | 62 }; |
60 | 63 |
61 WebrtcDataStreamAdapter::Channel::Channel( | 64 WebrtcDataStreamAdapter::Channel::Channel( |
62 base::WeakPtr<WebrtcDataStreamAdapter> adapter) | 65 base::WeakPtr<WebrtcDataStreamAdapter> adapter) |
63 : adapter_(adapter) {} | 66 : adapter_(adapter), weak_factory_(this) {} |
64 | 67 |
65 WebrtcDataStreamAdapter::Channel::~Channel() { | 68 WebrtcDataStreamAdapter::Channel::~Channel() { |
66 if (channel_) { | 69 if (channel_) { |
67 channel_->UnregisterObserver(); | 70 channel_->UnregisterObserver(); |
68 channel_->Close(); | 71 channel_->Close(); |
69 } | 72 } |
70 } | 73 } |
71 | 74 |
72 void WebrtcDataStreamAdapter::Channel::Start( | 75 void WebrtcDataStreamAdapter::Channel::Start( |
73 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { | 76 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { |
74 DCHECK(!channel_); | 77 DCHECK(!channel_); |
75 | 78 |
76 channel_ = channel; | 79 channel_ = channel; |
77 channel_->RegisterObserver(this); | 80 channel_->RegisterObserver(this); |
78 | 81 |
79 if (channel_->state() == webrtc::DataChannelInterface::kOpen) { | 82 if (channel_->state() == webrtc::DataChannelInterface::kOpen) { |
80 OnConnected(); | 83 OnConnected(); |
81 } else { | 84 } else { |
82 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); | 85 DCHECK_EQ(channel_->state(), webrtc::DataChannelInterface::kConnecting); |
83 } | 86 } |
84 } | 87 } |
85 | 88 |
86 void WebrtcDataStreamAdapter::Channel::StartReceiving( | 89 void WebrtcDataStreamAdapter::Channel::Start(EventHandler* event_handler) { |
87 const MessageReceivedCallback& callback) { | 90 DCHECK(!event_handler_); |
88 DCHECK(message_received_callback_.is_null()); | 91 DCHECK(event_handler); |
89 DCHECK(!callback.is_null()); | |
90 | 92 |
91 message_received_callback_ = callback; | 93 event_handler_ = event_handler; |
92 } | 94 } |
93 | 95 |
94 void WebrtcDataStreamAdapter::Channel::Send( | 96 void WebrtcDataStreamAdapter::Channel::Send( |
95 google::protobuf::MessageLite* message, | 97 google::protobuf::MessageLite* message, |
96 const base::Closure& done) { | 98 const base::Closure& done) { |
99 if(state_ != State::OPEN) | |
100 return; | |
Jamie
2016/07/19 18:24:47
Should we log an error here? Silently dropping the
Sergey Ulanov
2016/07/19 23:38:44
Replaced this with a DCHECK().
Also changed OnMess
| |
101 | |
97 rtc::CopyOnWriteBuffer buffer; | 102 rtc::CopyOnWriteBuffer buffer; |
98 buffer.SetSize(message->ByteSize()); | 103 buffer.SetSize(message->ByteSize()); |
99 message->SerializeWithCachedSizesToArray( | 104 message->SerializeWithCachedSizesToArray( |
100 reinterpret_cast<uint8_t*>(buffer.data())); | 105 reinterpret_cast<uint8_t*>(buffer.data())); |
101 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); | 106 webrtc::DataBuffer data_buffer(std::move(buffer), true /* binary */); |
102 if (!channel_->Send(data_buffer)) { | 107 if (!channel_->Send(data_buffer)) { |
103 OnError(); | 108 LOG(ERROR) << "Send failed on data channel " << channel_->label(); |
109 channel_->Close(); | |
104 return; | 110 return; |
105 } | 111 } |
106 | 112 |
107 if (!done.is_null()) | 113 if (!done.is_null()) |
108 done.Run(); | 114 done.Run(); |
109 } | 115 } |
110 | 116 |
111 void WebrtcDataStreamAdapter::Channel::OnStateChange() { | 117 void WebrtcDataStreamAdapter::Channel::OnStateChange() { |
112 switch (channel_->state()) { | 118 switch (channel_->state()) { |
113 case webrtc::DataChannelInterface::kOpen: | 119 case webrtc::DataChannelInterface::kOpen: |
114 OnConnected(); | 120 OnConnected(); |
115 break; | 121 break; |
116 | 122 |
117 case webrtc::DataChannelInterface::kClosing: | 123 case webrtc::DataChannelInterface::kClosing: |
118 // Currently channels are not expected to be closed. | 124 // Currently channels are not expected to be closed. |
Jamie
2016/07/19 18:24:47
Log an error in this case?
Sergey Ulanov
2016/07/19 23:38:43
Removed the comment. This class now does support c
| |
119 OnError(); | 125 OnClosed(); |
120 break; | 126 break; |
121 | 127 |
122 case webrtc::DataChannelInterface::kConnecting: | 128 case webrtc::DataChannelInterface::kConnecting: |
123 case webrtc::DataChannelInterface::kClosed: | 129 case webrtc::DataChannelInterface::kClosed: |
124 break; | 130 break; |
125 } | 131 } |
126 } | 132 } |
127 | 133 |
128 void WebrtcDataStreamAdapter::Channel::OnConnected() { | |
129 CHECK(state_ == State::CONNECTING); | |
130 state_ = State::OPEN; | |
131 adapter_->OnChannelConnected(this); | |
132 } | |
133 | |
134 void WebrtcDataStreamAdapter::Channel::OnError() { | |
135 if (state_ == State::CLOSED) | |
136 return; | |
137 | |
138 state_ = State::CLOSED; | |
139 | |
140 // Notify the adapter about the error asychronously. | |
141 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
142 FROM_HERE, | |
143 base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); | |
144 } | |
145 | |
146 void WebrtcDataStreamAdapter::Channel::OnMessage( | 134 void WebrtcDataStreamAdapter::Channel::OnMessage( |
147 const webrtc::DataBuffer& rtc_buffer) { | 135 const webrtc::DataBuffer& rtc_buffer) { |
148 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); | 136 std::unique_ptr<CompoundBuffer> buffer(new CompoundBuffer()); |
149 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), | 137 buffer->AppendCopyOf(reinterpret_cast<const char*>(rtc_buffer.data.data()), |
150 rtc_buffer.data.size()); | 138 rtc_buffer.data.size()); |
151 buffer->Lock(); | 139 buffer->Lock(); |
152 message_received_callback_.Run(std::move(buffer)); | 140 event_handler_->OnMessageReceived(std::move(buffer)); |
141 } | |
142 | |
143 void WebrtcDataStreamAdapter::Channel::OnConnected() { | |
144 DCHECK(state_ == State::CONNECTING); | |
145 state_ = State::OPEN; | |
146 adapter_->OnChannelConnected(this); | |
147 } | |
148 | |
149 void WebrtcDataStreamAdapter::Channel::OnClosed() { | |
150 switch (state_) { | |
151 case State::CONNECTING: | |
Jamie
2016/07/19 18:24:47
You're not setting the state to CLOSED in this bra
Sergey Ulanov
2016/07/19 23:38:44
No. Fixed now.
| |
152 LOG(WARNING) << "Channel " << channel_->label() | |
153 << " was closed before it's connected."; | |
154 // Notify the adapter about the error asynchronously. | |
155 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
156 FROM_HERE, | |
157 base::Bind(&WebrtcDataStreamAdapter::OnChannelError, adapter_)); | |
158 return; | |
159 | |
160 case State::OPEN: | |
161 state_ = State::CLOSED; | |
162 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
163 FROM_HERE, | |
164 base::Bind(&Channel::NotifyClosed, weak_factory_.GetWeakPtr())); | |
165 return; | |
166 | |
167 case State::CLOSED: | |
168 break; | |
169 } | |
170 } | |
171 | |
172 void WebrtcDataStreamAdapter::Channel::NotifyClosed() { | |
173 DCHECK(state_ == State::CLOSED); | |
174 event_handler_->OnMessagePipeClosed(); | |
153 } | 175 } |
154 | 176 |
155 struct WebrtcDataStreamAdapter::PendingChannel { | 177 struct WebrtcDataStreamAdapter::PendingChannel { |
156 PendingChannel() {} | |
157 PendingChannel(std::unique_ptr<Channel> channel, | 178 PendingChannel(std::unique_ptr<Channel> channel, |
158 const ChannelCreatedCallback& connected_callback) | 179 const ChannelCreatedCallback& connected_callback) |
159 : channel(std::move(channel)), connected_callback(connected_callback) {} | 180 : channel(std::move(channel)), connected_callback(connected_callback) {} |
160 PendingChannel(PendingChannel&& other) | 181 PendingChannel(PendingChannel&& other) |
161 : channel(std::move(other.channel)), | 182 : channel(std::move(other.channel)), |
162 connected_callback(std::move(other.connected_callback)) {} | 183 connected_callback(std::move(other.connected_callback)) {} |
163 PendingChannel& operator=(PendingChannel&& other) { | 184 PendingChannel& operator=(PendingChannel&& other) { |
164 channel = std::move(other.channel); | 185 channel = std::move(other.channel); |
165 connected_callback = std::move(other.connected_callback); | 186 connected_callback = std::move(other.connected_callback); |
166 return *this; | 187 return *this; |
167 } | 188 } |
168 | 189 |
169 std::unique_ptr<Channel> channel; | 190 std::unique_ptr<Channel> channel; |
170 ChannelCreatedCallback connected_callback; | 191 ChannelCreatedCallback connected_callback; |
171 }; | 192 }; |
172 | 193 |
173 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( | 194 WebrtcDataStreamAdapter::WebrtcDataStreamAdapter( |
174 bool outgoing, | |
175 const ErrorCallback& error_callback) | 195 const ErrorCallback& error_callback) |
176 : outgoing_(outgoing), | 196 : error_callback_(error_callback), weak_factory_(this) {} |
177 error_callback_(error_callback), | |
178 weak_factory_(this) {} | |
179 | 197 |
180 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { | 198 WebrtcDataStreamAdapter::~WebrtcDataStreamAdapter() { |
181 DCHECK(pending_channels_.empty()); | 199 DCHECK(pending_channels_.empty()); |
182 } | 200 } |
183 | 201 |
184 void WebrtcDataStreamAdapter::Initialize( | 202 void WebrtcDataStreamAdapter::Initialize( |
185 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) { | 203 rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection) { |
186 peer_connection_ = peer_connection; | 204 peer_connection_ = peer_connection; |
187 } | 205 } |
188 | 206 |
189 void WebrtcDataStreamAdapter::OnIncomingDataChannel( | 207 void WebrtcDataStreamAdapter::WrapIncomingDataChannel( |
190 webrtc::DataChannelInterface* data_channel) { | 208 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, |
191 DCHECK(!outgoing_); | 209 const ChannelCreatedCallback& callback) { |
192 | 210 AddPendingChannel(data_channel, callback); |
193 auto it = pending_channels_.find(data_channel->label()); | |
194 if (it == pending_channels_.end()) { | |
195 LOG(ERROR) << "Received unexpected data channel " << data_channel->label(); | |
196 return; | |
197 } | |
198 it->second.channel->Start(data_channel); | |
199 } | 211 } |
200 | 212 |
201 void WebrtcDataStreamAdapter::CreateChannel( | 213 void WebrtcDataStreamAdapter::CreateChannel( |
202 const std::string& name, | 214 const std::string& name, |
203 const ChannelCreatedCallback& callback) { | 215 const ChannelCreatedCallback& callback) { |
204 DCHECK(peer_connection_); | 216 webrtc::DataChannelInit config; |
205 DCHECK(pending_channels_.find(name) == pending_channels_.end()); | 217 config.reliable = true; |
206 | 218 AddPendingChannel(peer_connection_->CreateDataChannel(name, &config), |
207 Channel* channel = new Channel(weak_factory_.GetWeakPtr()); | 219 callback); |
208 pending_channels_[name] = PendingChannel(base::WrapUnique(channel), callback); | |
209 | |
210 if (outgoing_) { | |
211 webrtc::DataChannelInit config; | |
212 config.reliable = true; | |
213 channel->Start(peer_connection_->CreateDataChannel(name, &config)); | |
214 } | |
215 } | 220 } |
216 | 221 |
217 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { | 222 void WebrtcDataStreamAdapter::CancelChannelCreation(const std::string& name) { |
218 auto it = pending_channels_.find(name); | 223 auto it = pending_channels_.find(name); |
219 DCHECK(it != pending_channels_.end()); | 224 DCHECK(it != pending_channels_.end()); |
220 pending_channels_.erase(it); | 225 pending_channels_.erase(it); |
221 } | 226 } |
222 | 227 |
228 void WebrtcDataStreamAdapter::AddPendingChannel( | |
229 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel, | |
230 const ChannelCreatedCallback& callback) { | |
231 DCHECK(peer_connection_); | |
232 DCHECK(pending_channels_.find(data_channel->label()) == | |
233 pending_channels_.end()); | |
234 | |
235 Channel* channel = new Channel(weak_factory_.GetWeakPtr()); | |
236 pending_channels_.insert( | |
237 std::make_pair(data_channel->label(), | |
238 PendingChannel(base::WrapUnique(channel), callback))); | |
239 | |
240 channel->Start(data_channel); | |
241 } | |
242 | |
223 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { | 243 void WebrtcDataStreamAdapter::OnChannelConnected(Channel* channel) { |
224 auto it = pending_channels_.find(channel->name()); | 244 auto it = pending_channels_.find(channel->name()); |
225 DCHECK(it != pending_channels_.end()); | 245 DCHECK(it != pending_channels_.end()); |
246 | |
226 PendingChannel pending_channel = std::move(it->second); | 247 PendingChannel pending_channel = std::move(it->second); |
227 pending_channels_.erase(it); | 248 pending_channels_.erase(it); |
228 | 249 |
229 // Once the channel is connected its ownership is passed to the | 250 // Once the channel is connected its ownership is passed to the |
230 // |connected_callback|. | 251 // |connected_callback|. |
231 pending_channel.connected_callback.Run(std::move(pending_channel.channel)); | 252 pending_channel.connected_callback.Run(std::move(pending_channel.channel)); |
232 } | 253 } |
233 | 254 |
234 void WebrtcDataStreamAdapter::OnChannelError() { | 255 void WebrtcDataStreamAdapter::OnChannelError() { |
235 error_callback_.Run(CHANNEL_CONNECTION_ERROR); | 256 error_callback_.Run(CHANNEL_CONNECTION_ERROR); |
236 } | 257 } |
237 | 258 |
238 } // namespace protocol | 259 } // namespace protocol |
239 } // namespace remoting | 260 } // namespace remoting |
OLD | NEW |