| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/callback.h" | 10 #include "base/callback.h" |
| (...skipping 10 matching lines...) Expand all Loading... |
| 21 namespace protocol { | 21 namespace protocol { |
| 22 | 22 |
| 23 namespace { | 23 namespace { |
| 24 const int kChannelIdUnknown = -1; | 24 const int kChannelIdUnknown = -1; |
| 25 const int kMaxPacketSize = 1024; | 25 const int kMaxPacketSize = 1024; |
| 26 | 26 |
| 27 class PendingPacket { | 27 class PendingPacket { |
| 28 public: | 28 public: |
| 29 PendingPacket(scoped_ptr<MultiplexPacket> packet, | 29 PendingPacket(scoped_ptr<MultiplexPacket> packet, |
| 30 const base::Closure& done_task) | 30 const base::Closure& done_task) |
| 31 : packet(packet.Pass()), | 31 : packet(std::move(packet)), |
| 32 done_task(done_task), | 32 done_task(done_task), |
| 33 pos(0U) { | 33 pos(0U) { |
| 34 } | 34 } |
| 35 ~PendingPacket() { | 35 ~PendingPacket() { |
| 36 done_task.Run(); | 36 done_task.Run(); |
| 37 } | 37 } |
| 38 | 38 |
| 39 bool is_empty() { return pos >= packet->data().size(); } | 39 bool is_empty() { return pos >= packet->data().size(); } |
| 40 | 40 |
| 41 int Read(char* buffer, size_t size) { | 41 int Read(char* buffer, size_t size) { |
| (...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 149 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 149 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
| 150 // Socket must be destroyed before the channel. | 150 // Socket must be destroyed before the channel. |
| 151 DCHECK(!socket_); | 151 DCHECK(!socket_); |
| 152 STLDeleteElements(&pending_packets_); | 152 STLDeleteElements(&pending_packets_); |
| 153 } | 153 } |
| 154 | 154 |
| 155 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { | 155 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { |
| 156 DCHECK(!socket_); // Can't create more than one socket per channel. | 156 DCHECK(!socket_); // Can't create more than one socket per channel. |
| 157 scoped_ptr<MuxSocket> result(new MuxSocket(this)); | 157 scoped_ptr<MuxSocket> result(new MuxSocket(this)); |
| 158 socket_ = result.get(); | 158 socket_ = result.get(); |
| 159 return result.Pass(); | 159 return std::move(result); |
| 160 } | 160 } |
| 161 | 161 |
| 162 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 162 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
| 163 scoped_ptr<MultiplexPacket> packet, | 163 scoped_ptr<MultiplexPacket> packet, |
| 164 const base::Closure& done_task) { | 164 const base::Closure& done_task) { |
| 165 DCHECK_EQ(packet->channel_id(), receive_id_); | 165 DCHECK_EQ(packet->channel_id(), receive_id_); |
| 166 if (packet->data().size() > 0) { | 166 if (packet->data().size() > 0) { |
| 167 pending_packets_.push_back(new PendingPacket(packet.Pass(), done_task)); | 167 pending_packets_.push_back(new PendingPacket(std::move(packet), done_task)); |
| 168 if (socket_) { | 168 if (socket_) { |
| 169 // Notify the socket that we have more data. | 169 // Notify the socket that we have more data. |
| 170 socket_->OnPacketReceived(); | 170 socket_->OnPacketReceived(); |
| 171 } | 171 } |
| 172 } | 172 } |
| 173 } | 173 } |
| 174 | 174 |
| 175 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { | 175 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { |
| 176 if (socket_) | 176 if (socket_) |
| 177 socket_->OnBaseChannelError(error); | 177 socket_->OnBaseChannelError(error); |
| 178 } | 178 } |
| 179 | 179 |
| 180 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { | 180 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { |
| 181 DCHECK(socket_); | 181 DCHECK(socket_); |
| 182 socket_ = nullptr; | 182 socket_ = nullptr; |
| 183 } | 183 } |
| 184 | 184 |
| 185 void ChannelMultiplexer::MuxChannel::DoWrite( | 185 void ChannelMultiplexer::MuxChannel::DoWrite( |
| 186 scoped_ptr<MultiplexPacket> packet, | 186 scoped_ptr<MultiplexPacket> packet, |
| 187 const base::Closure& done_task) { | 187 const base::Closure& done_task) { |
| 188 packet->set_channel_id(send_id_); | 188 packet->set_channel_id(send_id_); |
| 189 if (!id_sent_) { | 189 if (!id_sent_) { |
| 190 packet->set_channel_name(name_); | 190 packet->set_channel_name(name_); |
| 191 id_sent_ = true; | 191 id_sent_ = true; |
| 192 } | 192 } |
| 193 multiplexer_->DoWrite(packet.Pass(), done_task); | 193 multiplexer_->DoWrite(std::move(packet), done_task); |
| 194 } | 194 } |
| 195 | 195 |
| 196 int ChannelMultiplexer::MuxChannel::DoRead( | 196 int ChannelMultiplexer::MuxChannel::DoRead( |
| 197 const scoped_refptr<net::IOBuffer>& buffer, | 197 const scoped_refptr<net::IOBuffer>& buffer, |
| 198 int buffer_len) { | 198 int buffer_len) { |
| 199 int pos = 0; | 199 int pos = 0; |
| 200 while (buffer_len > 0 && !pending_packets_.empty()) { | 200 while (buffer_len > 0 && !pending_packets_.empty()) { |
| 201 DCHECK(!pending_packets_.front()->is_empty()); | 201 DCHECK(!pending_packets_.front()->is_empty()); |
| 202 int result = pending_packets_.front()->Read( | 202 int result = pending_packets_.front()->Read( |
| 203 buffer->data() + pos, buffer_len); | 203 buffer->data() + pos, buffer_len); |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 249 DCHECK(write_callback_.is_null()); | 249 DCHECK(write_callback_.is_null()); |
| 250 | 250 |
| 251 if (base_channel_error_ != net::OK) | 251 if (base_channel_error_ != net::OK) |
| 252 return base_channel_error_; | 252 return base_channel_error_; |
| 253 | 253 |
| 254 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); | 254 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); |
| 255 size_t size = std::min(kMaxPacketSize, buffer_len); | 255 size_t size = std::min(kMaxPacketSize, buffer_len); |
| 256 packet->mutable_data()->assign(buffer->data(), size); | 256 packet->mutable_data()->assign(buffer->data(), size); |
| 257 | 257 |
| 258 write_pending_ = true; | 258 write_pending_ = true; |
| 259 channel_->DoWrite(packet.Pass(), base::Bind( | 259 channel_->DoWrite(std::move(packet), base::Bind( |
| 260 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); | 260 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); |
| 261 | 261 |
| 262 // OnWriteComplete() might be called above synchronously. | 262 // OnWriteComplete() might be called above synchronously. |
| 263 if (write_pending_) { | 263 if (write_pending_) { |
| 264 DCHECK(write_callback_.is_null()); | 264 DCHECK(write_callback_.is_null()); |
| 265 write_callback_ = callback; | 265 write_callback_ = callback; |
| 266 write_result_ = size; | 266 write_result_ = size; |
| 267 return net::ERR_IO_PENDING; | 267 return net::ERR_IO_PENDING; |
| 268 } | 268 } |
| 269 | 269 |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 354 if (it->name == name) { | 354 if (it->name == name) { |
| 355 pending_channels_.erase(it); | 355 pending_channels_.erase(it); |
| 356 return; | 356 return; |
| 357 } | 357 } |
| 358 } | 358 } |
| 359 } | 359 } |
| 360 | 360 |
| 361 void ChannelMultiplexer::OnBaseChannelReady( | 361 void ChannelMultiplexer::OnBaseChannelReady( |
| 362 scoped_ptr<P2PStreamSocket> socket) { | 362 scoped_ptr<P2PStreamSocket> socket) { |
| 363 base_channel_factory_ = nullptr; | 363 base_channel_factory_ = nullptr; |
| 364 base_channel_ = socket.Pass(); | 364 base_channel_ = std::move(socket); |
| 365 | 365 |
| 366 if (base_channel_.get()) { | 366 if (base_channel_.get()) { |
| 367 // Initialize reader and writer. | 367 // Initialize reader and writer. |
| 368 reader_.StartReading(base_channel_.get(), | 368 reader_.StartReading(base_channel_.get(), |
| 369 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 369 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
| 370 base::Unretained(this))); | 370 base::Unretained(this))); |
| 371 writer_.Init(base::Bind(&P2PStreamSocket::Write, | 371 writer_.Init(base::Bind(&P2PStreamSocket::Write, |
| 372 base::Unretained(base_channel_.get())), | 372 base::Unretained(base_channel_.get())), |
| 373 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 373 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
| 374 base::Unretained(this))); | 374 base::Unretained(this))); |
| (...skipping 12 matching lines...) Expand all Loading... |
| 387 // |pending_channels_| list (e.g. call CancelChannelCreation()). | 387 // |pending_channels_| list (e.g. call CancelChannelCreation()). |
| 388 base::ThreadTaskRunnerHandle::Get()->PostTask( | 388 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 389 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, | 389 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, |
| 390 weak_factory_.GetWeakPtr())); | 390 weak_factory_.GetWeakPtr())); |
| 391 | 391 |
| 392 PendingChannel c = pending_channels_.front(); | 392 PendingChannel c = pending_channels_.front(); |
| 393 pending_channels_.erase(pending_channels_.begin()); | 393 pending_channels_.erase(pending_channels_.begin()); |
| 394 scoped_ptr<P2PStreamSocket> socket; | 394 scoped_ptr<P2PStreamSocket> socket; |
| 395 if (base_channel_.get()) | 395 if (base_channel_.get()) |
| 396 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 396 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
| 397 c.callback.Run(socket.Pass()); | 397 c.callback.Run(std::move(socket)); |
| 398 } | 398 } |
| 399 | 399 |
| 400 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 400 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
| 401 const std::string& name) { | 401 const std::string& name) { |
| 402 // Check if we already have a channel with the requested name. | 402 // Check if we already have a channel with the requested name. |
| 403 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 403 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
| 404 if (it != channels_.end()) | 404 if (it != channels_.end()) |
| 405 return it->second; | 405 return it->second; |
| 406 | 406 |
| 407 // Create a new channel if we haven't found existing one. | 407 // Create a new channel if we haven't found existing one. |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 450 LOG(ERROR) << "Received packet with unknown channel_id and " | 450 LOG(ERROR) << "Received packet with unknown channel_id and " |
| 451 "without channel_name."; | 451 "without channel_name."; |
| 452 done_task.Run(); | 452 done_task.Run(); |
| 453 return; | 453 return; |
| 454 } | 454 } |
| 455 channel = GetOrCreateChannel(packet->channel_name()); | 455 channel = GetOrCreateChannel(packet->channel_name()); |
| 456 channel->set_receive_id(receive_id); | 456 channel->set_receive_id(receive_id); |
| 457 channels_by_receive_id_[receive_id] = channel; | 457 channels_by_receive_id_[receive_id] = channel; |
| 458 } | 458 } |
| 459 | 459 |
| 460 channel->OnIncomingPacket(packet.Pass(), done_task); | 460 channel->OnIncomingPacket(std::move(packet), done_task); |
| 461 } | 461 } |
| 462 | 462 |
| 463 void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, | 463 void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, |
| 464 const base::Closure& done_task) { | 464 const base::Closure& done_task) { |
| 465 writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 465 writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
| 466 } | 466 } |
| 467 | 467 |
| 468 } // namespace protocol | 468 } // namespace protocol |
| 469 } // namespace remoting | 469 } // namespace remoting |
| OLD | NEW |