| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <string.h> | 8 #include <string.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/bind.h" | 12 #include "base/bind.h" |
| 13 #include "base/callback.h" | 13 #include "base/callback.h" |
| 14 #include "base/callback_helpers.h" | 14 #include "base/callback_helpers.h" |
| 15 #include "base/location.h" | 15 #include "base/location.h" |
| 16 #include "base/macros.h" | 16 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" |
| 17 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
| 18 #include "base/stl_util.h" | |
| 19 #include "base/threading/thread_task_runner_handle.h" | 19 #include "base/threading/thread_task_runner_handle.h" |
| 20 #include "net/base/net_errors.h" | 20 #include "net/base/net_errors.h" |
| 21 #include "remoting/protocol/message_serialization.h" | 21 #include "remoting/protocol/message_serialization.h" |
| 22 #include "remoting/protocol/p2p_stream_socket.h" | 22 #include "remoting/protocol/p2p_stream_socket.h" |
| 23 | 23 |
| 24 namespace remoting { | 24 namespace remoting { |
| 25 namespace protocol { | 25 namespace protocol { |
| 26 | 26 |
| 27 namespace { | 27 namespace { |
| 28 const int kChannelIdUnknown = -1; | 28 const int kChannelIdUnknown = -1; |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 84 const base::Closure& done_task); | 84 const base::Closure& done_task); |
| 85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); | 85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); |
| 86 | 86 |
| 87 private: | 87 private: |
| 88 ChannelMultiplexer* multiplexer_; | 88 ChannelMultiplexer* multiplexer_; |
| 89 std::string name_; | 89 std::string name_; |
| 90 int send_id_; | 90 int send_id_; |
| 91 bool id_sent_; | 91 bool id_sent_; |
| 92 int receive_id_; | 92 int receive_id_; |
| 93 MuxSocket* socket_; | 93 MuxSocket* socket_; |
| 94 std::list<PendingPacket*> pending_packets_; | 94 std::list<std::unique_ptr<PendingPacket>> pending_packets_; |
| 95 | 95 |
| 96 DISALLOW_COPY_AND_ASSIGN(MuxChannel); | 96 DISALLOW_COPY_AND_ASSIGN(MuxChannel); |
| 97 }; | 97 }; |
| 98 | 98 |
| 99 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, | 99 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, |
| 100 public base::NonThreadSafe, | 100 public base::NonThreadSafe, |
| 101 public base::SupportsWeakPtr<MuxSocket> { | 101 public base::SupportsWeakPtr<MuxSocket> { |
| 102 public: | 102 public: |
| 103 MuxSocket(MuxChannel* channel); | 103 MuxSocket(MuxChannel* channel); |
| 104 ~MuxSocket() override; | 104 ~MuxSocket() override; |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 138 name_(name), | 138 name_(name), |
| 139 send_id_(send_id), | 139 send_id_(send_id), |
| 140 id_sent_(false), | 140 id_sent_(false), |
| 141 receive_id_(kChannelIdUnknown), | 141 receive_id_(kChannelIdUnknown), |
| 142 socket_(nullptr) { | 142 socket_(nullptr) { |
| 143 } | 143 } |
| 144 | 144 |
| 145 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 145 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
| 146 // Socket must be destroyed before the channel. | 146 // Socket must be destroyed before the channel. |
| 147 DCHECK(!socket_); | 147 DCHECK(!socket_); |
| 148 base::STLDeleteElements(&pending_packets_); | |
| 149 } | 148 } |
| 150 | 149 |
| 151 std::unique_ptr<P2PStreamSocket> | 150 std::unique_ptr<P2PStreamSocket> |
| 152 ChannelMultiplexer::MuxChannel::CreateSocket() { | 151 ChannelMultiplexer::MuxChannel::CreateSocket() { |
| 153 DCHECK(!socket_); // Can't create more than one socket per channel. | 152 DCHECK(!socket_); // Can't create more than one socket per channel. |
| 154 std::unique_ptr<MuxSocket> result(new MuxSocket(this)); | 153 std::unique_ptr<MuxSocket> result(new MuxSocket(this)); |
| 155 socket_ = result.get(); | 154 socket_ = result.get(); |
| 156 return std::move(result); | 155 return std::move(result); |
| 157 } | 156 } |
| 158 | 157 |
| 159 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 158 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
| 160 std::unique_ptr<MultiplexPacket> packet) { | 159 std::unique_ptr<MultiplexPacket> packet) { |
| 161 DCHECK_EQ(packet->channel_id(), receive_id_); | 160 DCHECK_EQ(packet->channel_id(), receive_id_); |
| 162 if (packet->data().size() > 0) { | 161 if (packet->data().size() > 0) { |
| 163 pending_packets_.push_back(new PendingPacket(std::move(packet))); | 162 pending_packets_.push_back( |
| 163 base::MakeUnique<PendingPacket>(std::move(packet))); |
| 164 if (socket_) { | 164 if (socket_) { |
| 165 // Notify the socket that we have more data. | 165 // Notify the socket that we have more data. |
| 166 socket_->OnPacketReceived(); | 166 socket_->OnPacketReceived(); |
| 167 } | 167 } |
| 168 } | 168 } |
| 169 } | 169 } |
| 170 | 170 |
| 171 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { | 171 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { |
| 172 if (socket_) | 172 if (socket_) |
| 173 socket_->OnBaseChannelError(error); | 173 socket_->OnBaseChannelError(error); |
| (...skipping 19 matching lines...) Expand all Loading... |
| 193 const scoped_refptr<net::IOBuffer>& buffer, | 193 const scoped_refptr<net::IOBuffer>& buffer, |
| 194 int buffer_len) { | 194 int buffer_len) { |
| 195 int pos = 0; | 195 int pos = 0; |
| 196 while (buffer_len > 0 && !pending_packets_.empty()) { | 196 while (buffer_len > 0 && !pending_packets_.empty()) { |
| 197 DCHECK(!pending_packets_.front()->is_empty()); | 197 DCHECK(!pending_packets_.front()->is_empty()); |
| 198 int result = pending_packets_.front()->Read( | 198 int result = pending_packets_.front()->Read( |
| 199 buffer->data() + pos, buffer_len); | 199 buffer->data() + pos, buffer_len); |
| 200 DCHECK_LE(result, buffer_len); | 200 DCHECK_LE(result, buffer_len); |
| 201 pos += result; | 201 pos += result; |
| 202 buffer_len -= pos; | 202 buffer_len -= pos; |
| 203 if (pending_packets_.front()->is_empty()) { | 203 if (pending_packets_.front()->is_empty()) |
| 204 delete pending_packets_.front(); | 204 pending_packets_.pop_front(); |
| 205 pending_packets_.erase(pending_packets_.begin()); | |
| 206 } | |
| 207 } | 205 } |
| 208 return pos; | 206 return pos; |
| 209 } | 207 } |
| 210 | 208 |
| 211 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel) | 209 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel) |
| 212 : channel_(channel), | 210 : channel_(channel), |
| 213 read_buffer_size_(0), | 211 read_buffer_size_(0), |
| 214 write_pending_(false), | 212 write_pending_(false), |
| 215 write_result_(0) { | 213 write_result_(0) { |
| 216 } | 214 } |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 303 | 301 |
| 304 ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory, | 302 ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory, |
| 305 const std::string& base_channel_name) | 303 const std::string& base_channel_name) |
| 306 : base_channel_factory_(factory), | 304 : base_channel_factory_(factory), |
| 307 base_channel_name_(base_channel_name), | 305 base_channel_name_(base_channel_name), |
| 308 next_channel_id_(0), | 306 next_channel_id_(0), |
| 309 weak_factory_(this) {} | 307 weak_factory_(this) {} |
| 310 | 308 |
| 311 ChannelMultiplexer::~ChannelMultiplexer() { | 309 ChannelMultiplexer::~ChannelMultiplexer() { |
| 312 DCHECK(pending_channels_.empty()); | 310 DCHECK(pending_channels_.empty()); |
| 313 base::STLDeleteValues(&channels_); | |
| 314 | 311 |
| 315 // Cancel creation of the base channel if it hasn't finished. | 312 // Cancel creation of the base channel if it hasn't finished. |
| 316 if (base_channel_factory_) | 313 if (base_channel_factory_) |
| 317 base_channel_factory_->CancelChannelCreation(base_channel_name_); | 314 base_channel_factory_->CancelChannelCreation(base_channel_name_); |
| 318 } | 315 } |
| 319 | 316 |
| 320 void ChannelMultiplexer::CreateChannel(const std::string& name, | 317 void ChannelMultiplexer::CreateChannel(const std::string& name, |
| 321 const ChannelCreatedCallback& callback) { | 318 const ChannelCreatedCallback& callback) { |
| 322 if (base_channel_.get()) { | 319 if (base_channel_.get()) { |
| 323 // Already have |base_channel_|. Create new multiplexed channel | 320 // Already have |base_channel_|. Create new multiplexed channel |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 386 PendingChannel c = pending_channels_.front(); | 383 PendingChannel c = pending_channels_.front(); |
| 387 pending_channels_.erase(pending_channels_.begin()); | 384 pending_channels_.erase(pending_channels_.begin()); |
| 388 std::unique_ptr<P2PStreamSocket> socket; | 385 std::unique_ptr<P2PStreamSocket> socket; |
| 389 if (base_channel_.get()) | 386 if (base_channel_.get()) |
| 390 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 387 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
| 391 c.callback.Run(std::move(socket)); | 388 c.callback.Run(std::move(socket)); |
| 392 } | 389 } |
| 393 | 390 |
| 394 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 391 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
| 395 const std::string& name) { | 392 const std::string& name) { |
| 396 // Check if we already have a channel with the requested name. | 393 std::unique_ptr<MuxChannel>& channel = channels_[name]; |
| 397 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 394 if (!channel) { |
| 398 if (it != channels_.end()) | 395 // Create a new channel if we haven't found existing one. |
| 399 return it->second; | 396 channel = base::MakeUnique<MuxChannel>(this, name, next_channel_id_); |
| 397 ++next_channel_id_; |
| 398 } |
| 400 | 399 |
| 401 // Create a new channel if we haven't found existing one. | 400 return channel.get(); |
| 402 MuxChannel* channel = new MuxChannel(this, name, next_channel_id_); | |
| 403 ++next_channel_id_; | |
| 404 channels_[channel->name()] = channel; | |
| 405 return channel; | |
| 406 } | 401 } |
| 407 | 402 |
| 408 | 403 |
| 409 void ChannelMultiplexer::OnBaseChannelError(int error) { | 404 void ChannelMultiplexer::OnBaseChannelError(int error) { |
| 410 for (std::map<std::string, MuxChannel*>::iterator it = channels_.begin(); | 405 for (auto it = channels_.begin(); it != channels_.end(); ++it) { |
| 411 it != channels_.end(); ++it) { | |
| 412 base::ThreadTaskRunnerHandle::Get()->PostTask( | 406 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 413 FROM_HERE, | 407 FROM_HERE, |
| 414 base::Bind(&ChannelMultiplexer::NotifyBaseChannelError, | 408 base::Bind(&ChannelMultiplexer::NotifyBaseChannelError, |
| 415 weak_factory_.GetWeakPtr(), it->second->name(), error)); | 409 weak_factory_.GetWeakPtr(), it->second->name(), error)); |
| 416 } | 410 } |
| 417 } | 411 } |
| 418 | 412 |
| 419 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, | 413 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, |
| 420 int error) { | 414 int error) { |
| 421 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 415 auto it = channels_.find(name); |
| 422 if (it != channels_.end()) | 416 if (it != channels_.end()) |
| 423 it->second->OnBaseChannelError(error); | 417 it->second->OnBaseChannelError(error); |
| 424 } | 418 } |
| 425 | 419 |
| 426 void ChannelMultiplexer::OnIncomingPacket( | 420 void ChannelMultiplexer::OnIncomingPacket( |
| 427 std::unique_ptr<CompoundBuffer> buffer) { | 421 std::unique_ptr<CompoundBuffer> buffer) { |
| 428 std::unique_ptr<MultiplexPacket> packet = | 422 std::unique_ptr<MultiplexPacket> packet = |
| 429 ParseMessage<MultiplexPacket>(buffer.get()); | 423 ParseMessage<MultiplexPacket>(buffer.get()); |
| 430 if (!packet) | 424 if (!packet) |
| 431 return; | 425 return; |
| (...skipping 25 matching lines...) Expand all Loading... |
| 457 channel->OnIncomingPacket(std::move(packet)); | 451 channel->OnIncomingPacket(std::move(packet)); |
| 458 } | 452 } |
| 459 | 453 |
| 460 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, | 454 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, |
| 461 const base::Closure& done_task) { | 455 const base::Closure& done_task) { |
| 462 writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 456 writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
| 463 } | 457 } |
| 464 | 458 |
| 465 } // namespace protocol | 459 } // namespace protocol |
| 466 } // namespace remoting | 460 } // namespace remoting |
| OLD | NEW |