OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
(...skipping 10 matching lines...) Expand all Loading... |
21 namespace protocol { | 21 namespace protocol { |
22 | 22 |
23 namespace { | 23 namespace { |
24 const int kChannelIdUnknown = -1; | 24 const int kChannelIdUnknown = -1; |
25 const int kMaxPacketSize = 1024; | 25 const int kMaxPacketSize = 1024; |
26 | 26 |
27 class PendingPacket { | 27 class PendingPacket { |
28 public: | 28 public: |
29 PendingPacket(scoped_ptr<MultiplexPacket> packet, | 29 PendingPacket(scoped_ptr<MultiplexPacket> packet, |
30 const base::Closure& done_task) | 30 const base::Closure& done_task) |
31 : packet(packet.Pass()), | 31 : packet(std::move(packet)), |
32 done_task(done_task), | 32 done_task(done_task), |
33 pos(0U) { | 33 pos(0U) { |
34 } | 34 } |
35 ~PendingPacket() { | 35 ~PendingPacket() { |
36 done_task.Run(); | 36 done_task.Run(); |
37 } | 37 } |
38 | 38 |
39 bool is_empty() { return pos >= packet->data().size(); } | 39 bool is_empty() { return pos >= packet->data().size(); } |
40 | 40 |
41 int Read(char* buffer, size_t size) { | 41 int Read(char* buffer, size_t size) { |
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
149 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 149 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
150 // Socket must be destroyed before the channel. | 150 // Socket must be destroyed before the channel. |
151 DCHECK(!socket_); | 151 DCHECK(!socket_); |
152 STLDeleteElements(&pending_packets_); | 152 STLDeleteElements(&pending_packets_); |
153 } | 153 } |
154 | 154 |
155 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { | 155 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { |
156 DCHECK(!socket_); // Can't create more than one socket per channel. | 156 DCHECK(!socket_); // Can't create more than one socket per channel. |
157 scoped_ptr<MuxSocket> result(new MuxSocket(this)); | 157 scoped_ptr<MuxSocket> result(new MuxSocket(this)); |
158 socket_ = result.get(); | 158 socket_ = result.get(); |
159 return result.Pass(); | 159 return std::move(result); |
160 } | 160 } |
161 | 161 |
162 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 162 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
163 scoped_ptr<MultiplexPacket> packet, | 163 scoped_ptr<MultiplexPacket> packet, |
164 const base::Closure& done_task) { | 164 const base::Closure& done_task) { |
165 DCHECK_EQ(packet->channel_id(), receive_id_); | 165 DCHECK_EQ(packet->channel_id(), receive_id_); |
166 if (packet->data().size() > 0) { | 166 if (packet->data().size() > 0) { |
167 pending_packets_.push_back(new PendingPacket(packet.Pass(), done_task)); | 167 pending_packets_.push_back(new PendingPacket(std::move(packet), done_task)); |
168 if (socket_) { | 168 if (socket_) { |
169 // Notify the socket that we have more data. | 169 // Notify the socket that we have more data. |
170 socket_->OnPacketReceived(); | 170 socket_->OnPacketReceived(); |
171 } | 171 } |
172 } | 172 } |
173 } | 173 } |
174 | 174 |
175 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { | 175 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { |
176 if (socket_) | 176 if (socket_) |
177 socket_->OnBaseChannelError(error); | 177 socket_->OnBaseChannelError(error); |
178 } | 178 } |
179 | 179 |
180 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { | 180 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { |
181 DCHECK(socket_); | 181 DCHECK(socket_); |
182 socket_ = nullptr; | 182 socket_ = nullptr; |
183 } | 183 } |
184 | 184 |
185 void ChannelMultiplexer::MuxChannel::DoWrite( | 185 void ChannelMultiplexer::MuxChannel::DoWrite( |
186 scoped_ptr<MultiplexPacket> packet, | 186 scoped_ptr<MultiplexPacket> packet, |
187 const base::Closure& done_task) { | 187 const base::Closure& done_task) { |
188 packet->set_channel_id(send_id_); | 188 packet->set_channel_id(send_id_); |
189 if (!id_sent_) { | 189 if (!id_sent_) { |
190 packet->set_channel_name(name_); | 190 packet->set_channel_name(name_); |
191 id_sent_ = true; | 191 id_sent_ = true; |
192 } | 192 } |
193 multiplexer_->DoWrite(packet.Pass(), done_task); | 193 multiplexer_->DoWrite(std::move(packet), done_task); |
194 } | 194 } |
195 | 195 |
196 int ChannelMultiplexer::MuxChannel::DoRead( | 196 int ChannelMultiplexer::MuxChannel::DoRead( |
197 const scoped_refptr<net::IOBuffer>& buffer, | 197 const scoped_refptr<net::IOBuffer>& buffer, |
198 int buffer_len) { | 198 int buffer_len) { |
199 int pos = 0; | 199 int pos = 0; |
200 while (buffer_len > 0 && !pending_packets_.empty()) { | 200 while (buffer_len > 0 && !pending_packets_.empty()) { |
201 DCHECK(!pending_packets_.front()->is_empty()); | 201 DCHECK(!pending_packets_.front()->is_empty()); |
202 int result = pending_packets_.front()->Read( | 202 int result = pending_packets_.front()->Read( |
203 buffer->data() + pos, buffer_len); | 203 buffer->data() + pos, buffer_len); |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
249 DCHECK(write_callback_.is_null()); | 249 DCHECK(write_callback_.is_null()); |
250 | 250 |
251 if (base_channel_error_ != net::OK) | 251 if (base_channel_error_ != net::OK) |
252 return base_channel_error_; | 252 return base_channel_error_; |
253 | 253 |
254 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); | 254 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); |
255 size_t size = std::min(kMaxPacketSize, buffer_len); | 255 size_t size = std::min(kMaxPacketSize, buffer_len); |
256 packet->mutable_data()->assign(buffer->data(), size); | 256 packet->mutable_data()->assign(buffer->data(), size); |
257 | 257 |
258 write_pending_ = true; | 258 write_pending_ = true; |
259 channel_->DoWrite(packet.Pass(), base::Bind( | 259 channel_->DoWrite(std::move(packet), base::Bind( |
260 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); | 260 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); |
261 | 261 |
262 // OnWriteComplete() might be called above synchronously. | 262 // OnWriteComplete() might be called above synchronously. |
263 if (write_pending_) { | 263 if (write_pending_) { |
264 DCHECK(write_callback_.is_null()); | 264 DCHECK(write_callback_.is_null()); |
265 write_callback_ = callback; | 265 write_callback_ = callback; |
266 write_result_ = size; | 266 write_result_ = size; |
267 return net::ERR_IO_PENDING; | 267 return net::ERR_IO_PENDING; |
268 } | 268 } |
269 | 269 |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
354 if (it->name == name) { | 354 if (it->name == name) { |
355 pending_channels_.erase(it); | 355 pending_channels_.erase(it); |
356 return; | 356 return; |
357 } | 357 } |
358 } | 358 } |
359 } | 359 } |
360 | 360 |
361 void ChannelMultiplexer::OnBaseChannelReady( | 361 void ChannelMultiplexer::OnBaseChannelReady( |
362 scoped_ptr<P2PStreamSocket> socket) { | 362 scoped_ptr<P2PStreamSocket> socket) { |
363 base_channel_factory_ = nullptr; | 363 base_channel_factory_ = nullptr; |
364 base_channel_ = socket.Pass(); | 364 base_channel_ = std::move(socket); |
365 | 365 |
366 if (base_channel_.get()) { | 366 if (base_channel_.get()) { |
367 // Initialize reader and writer. | 367 // Initialize reader and writer. |
368 reader_.StartReading(base_channel_.get(), | 368 reader_.StartReading(base_channel_.get(), |
369 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 369 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
370 base::Unretained(this))); | 370 base::Unretained(this))); |
371 writer_.Init(base::Bind(&P2PStreamSocket::Write, | 371 writer_.Init(base::Bind(&P2PStreamSocket::Write, |
372 base::Unretained(base_channel_.get())), | 372 base::Unretained(base_channel_.get())), |
373 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 373 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
374 base::Unretained(this))); | 374 base::Unretained(this))); |
(...skipping 12 matching lines...) Expand all Loading... |
387 // |pending_channels_| list (e.g. call CancelChannelCreation()). | 387 // |pending_channels_| list (e.g. call CancelChannelCreation()). |
388 base::ThreadTaskRunnerHandle::Get()->PostTask( | 388 base::ThreadTaskRunnerHandle::Get()->PostTask( |
389 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, | 389 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, |
390 weak_factory_.GetWeakPtr())); | 390 weak_factory_.GetWeakPtr())); |
391 | 391 |
392 PendingChannel c = pending_channels_.front(); | 392 PendingChannel c = pending_channels_.front(); |
393 pending_channels_.erase(pending_channels_.begin()); | 393 pending_channels_.erase(pending_channels_.begin()); |
394 scoped_ptr<P2PStreamSocket> socket; | 394 scoped_ptr<P2PStreamSocket> socket; |
395 if (base_channel_.get()) | 395 if (base_channel_.get()) |
396 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 396 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
397 c.callback.Run(socket.Pass()); | 397 c.callback.Run(std::move(socket)); |
398 } | 398 } |
399 | 399 |
400 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 400 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
401 const std::string& name) { | 401 const std::string& name) { |
402 // Check if we already have a channel with the requested name. | 402 // Check if we already have a channel with the requested name. |
403 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 403 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
404 if (it != channels_.end()) | 404 if (it != channels_.end()) |
405 return it->second; | 405 return it->second; |
406 | 406 |
407 // Create a new channel if we haven't found existing one. | 407 // Create a new channel if we haven't found existing one. |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
450 LOG(ERROR) << "Received packet with unknown channel_id and " | 450 LOG(ERROR) << "Received packet with unknown channel_id and " |
451 "without channel_name."; | 451 "without channel_name."; |
452 done_task.Run(); | 452 done_task.Run(); |
453 return; | 453 return; |
454 } | 454 } |
455 channel = GetOrCreateChannel(packet->channel_name()); | 455 channel = GetOrCreateChannel(packet->channel_name()); |
456 channel->set_receive_id(receive_id); | 456 channel->set_receive_id(receive_id); |
457 channels_by_receive_id_[receive_id] = channel; | 457 channels_by_receive_id_[receive_id] = channel; |
458 } | 458 } |
459 | 459 |
460 channel->OnIncomingPacket(packet.Pass(), done_task); | 460 channel->OnIncomingPacket(std::move(packet), done_task); |
461 } | 461 } |
462 | 462 |
463 void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, | 463 void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, |
464 const base::Closure& done_task) { | 464 const base::Closure& done_task) { |
465 writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 465 writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
466 } | 466 } |
467 | 467 |
468 } // namespace protocol | 468 } // namespace protocol |
469 } // namespace remoting | 469 } // namespace remoting |
OLD | NEW |