| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <string.h> | 8 #include <string.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 12 matching lines...) Expand all Loading... |
| 23 | 23 |
| 24 namespace remoting { | 24 namespace remoting { |
| 25 namespace protocol { | 25 namespace protocol { |
| 26 | 26 |
| 27 namespace { | 27 namespace { |
| 28 const int kChannelIdUnknown = -1; | 28 const int kChannelIdUnknown = -1; |
| 29 const int kMaxPacketSize = 1024; | 29 const int kMaxPacketSize = 1024; |
| 30 | 30 |
| 31 class PendingPacket { | 31 class PendingPacket { |
| 32 public: | 32 public: |
| 33 PendingPacket(scoped_ptr<MultiplexPacket> packet) | 33 PendingPacket(std::unique_ptr<MultiplexPacket> packet) |
| 34 : packet(std::move(packet)) {} | 34 : packet(std::move(packet)) {} |
| 35 ~PendingPacket() {} | 35 ~PendingPacket() {} |
| 36 | 36 |
| 37 bool is_empty() { return pos >= packet->data().size(); } | 37 bool is_empty() { return pos >= packet->data().size(); } |
| 38 | 38 |
| 39 int Read(char* buffer, size_t size) { | 39 int Read(char* buffer, size_t size) { |
| 40 size = std::min(size, packet->data().size() - pos); | 40 size = std::min(size, packet->data().size() - pos); |
| 41 memcpy(buffer, packet->data().data() + pos, size); | 41 memcpy(buffer, packet->data().data() + pos, size); |
| 42 pos += size; | 42 pos += size; |
| 43 return size; | 43 return size; |
| 44 } | 44 } |
| 45 | 45 |
| 46 private: | 46 private: |
| 47 scoped_ptr<MultiplexPacket> packet; | 47 std::unique_ptr<MultiplexPacket> packet; |
| 48 size_t pos = 0U; | 48 size_t pos = 0U; |
| 49 | 49 |
| 50 DISALLOW_COPY_AND_ASSIGN(PendingPacket); | 50 DISALLOW_COPY_AND_ASSIGN(PendingPacket); |
| 51 }; | 51 }; |
| 52 | 52 |
| 53 } // namespace | 53 } // namespace |
| 54 | 54 |
| 55 const char ChannelMultiplexer::kMuxChannelName[] = "mux"; | 55 const char ChannelMultiplexer::kMuxChannelName[] = "mux"; |
| 56 | 56 |
| 57 struct ChannelMultiplexer::PendingChannel { | 57 struct ChannelMultiplexer::PendingChannel { |
| 58 PendingChannel(const std::string& name, | 58 PendingChannel(const std::string& name, |
| 59 const ChannelCreatedCallback& callback) | 59 const ChannelCreatedCallback& callback) |
| 60 : name(name), callback(callback) { | 60 : name(name), callback(callback) { |
| 61 } | 61 } |
| 62 std::string name; | 62 std::string name; |
| 63 ChannelCreatedCallback callback; | 63 ChannelCreatedCallback callback; |
| 64 }; | 64 }; |
| 65 | 65 |
| 66 class ChannelMultiplexer::MuxChannel { | 66 class ChannelMultiplexer::MuxChannel { |
| 67 public: | 67 public: |
| 68 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, | 68 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, |
| 69 int send_id); | 69 int send_id); |
| 70 ~MuxChannel(); | 70 ~MuxChannel(); |
| 71 | 71 |
| 72 const std::string& name() { return name_; } | 72 const std::string& name() { return name_; } |
| 73 int receive_id() { return receive_id_; } | 73 int receive_id() { return receive_id_; } |
| 74 void set_receive_id(int id) { receive_id_ = id; } | 74 void set_receive_id(int id) { receive_id_ = id; } |
| 75 | 75 |
| 76 // Called by ChannelMultiplexer. | 76 // Called by ChannelMultiplexer. |
| 77 scoped_ptr<P2PStreamSocket> CreateSocket(); | 77 std::unique_ptr<P2PStreamSocket> CreateSocket(); |
| 78 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet); | 78 void OnIncomingPacket(std::unique_ptr<MultiplexPacket> packet); |
| 79 void OnBaseChannelError(int error); | 79 void OnBaseChannelError(int error); |
| 80 | 80 |
| 81 // Called by MuxSocket. | 81 // Called by MuxSocket. |
| 82 void OnSocketDestroyed(); | 82 void OnSocketDestroyed(); |
| 83 void DoWrite(scoped_ptr<MultiplexPacket> packet, | 83 void DoWrite(std::unique_ptr<MultiplexPacket> packet, |
| 84 const base::Closure& done_task); | 84 const base::Closure& done_task); |
| 85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); | 85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); |
| 86 | 86 |
| 87 private: | 87 private: |
| 88 ChannelMultiplexer* multiplexer_; | 88 ChannelMultiplexer* multiplexer_; |
| 89 std::string name_; | 89 std::string name_; |
| 90 int send_id_; | 90 int send_id_; |
| 91 bool id_sent_; | 91 bool id_sent_; |
| 92 int receive_id_; | 92 int receive_id_; |
| 93 MuxSocket* socket_; | 93 MuxSocket* socket_; |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 141 receive_id_(kChannelIdUnknown), | 141 receive_id_(kChannelIdUnknown), |
| 142 socket_(nullptr) { | 142 socket_(nullptr) { |
| 143 } | 143 } |
| 144 | 144 |
| 145 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 145 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
| 146 // Socket must be destroyed before the channel. | 146 // Socket must be destroyed before the channel. |
| 147 DCHECK(!socket_); | 147 DCHECK(!socket_); |
| 148 STLDeleteElements(&pending_packets_); | 148 STLDeleteElements(&pending_packets_); |
| 149 } | 149 } |
| 150 | 150 |
| 151 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { | 151 std::unique_ptr<P2PStreamSocket> |
| 152 ChannelMultiplexer::MuxChannel::CreateSocket() { |
| 152 DCHECK(!socket_); // Can't create more than one socket per channel. | 153 DCHECK(!socket_); // Can't create more than one socket per channel. |
| 153 scoped_ptr<MuxSocket> result(new MuxSocket(this)); | 154 std::unique_ptr<MuxSocket> result(new MuxSocket(this)); |
| 154 socket_ = result.get(); | 155 socket_ = result.get(); |
| 155 return std::move(result); | 156 return std::move(result); |
| 156 } | 157 } |
| 157 | 158 |
| 158 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 159 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
| 159 scoped_ptr<MultiplexPacket> packet) { | 160 std::unique_ptr<MultiplexPacket> packet) { |
| 160 DCHECK_EQ(packet->channel_id(), receive_id_); | 161 DCHECK_EQ(packet->channel_id(), receive_id_); |
| 161 if (packet->data().size() > 0) { | 162 if (packet->data().size() > 0) { |
| 162 pending_packets_.push_back(new PendingPacket(std::move(packet))); | 163 pending_packets_.push_back(new PendingPacket(std::move(packet))); |
| 163 if (socket_) { | 164 if (socket_) { |
| 164 // Notify the socket that we have more data. | 165 // Notify the socket that we have more data. |
| 165 socket_->OnPacketReceived(); | 166 socket_->OnPacketReceived(); |
| 166 } | 167 } |
| 167 } | 168 } |
| 168 } | 169 } |
| 169 | 170 |
| 170 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { | 171 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { |
| 171 if (socket_) | 172 if (socket_) |
| 172 socket_->OnBaseChannelError(error); | 173 socket_->OnBaseChannelError(error); |
| 173 } | 174 } |
| 174 | 175 |
| 175 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { | 176 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { |
| 176 DCHECK(socket_); | 177 DCHECK(socket_); |
| 177 socket_ = nullptr; | 178 socket_ = nullptr; |
| 178 } | 179 } |
| 179 | 180 |
| 180 void ChannelMultiplexer::MuxChannel::DoWrite( | 181 void ChannelMultiplexer::MuxChannel::DoWrite( |
| 181 scoped_ptr<MultiplexPacket> packet, | 182 std::unique_ptr<MultiplexPacket> packet, |
| 182 const base::Closure& done_task) { | 183 const base::Closure& done_task) { |
| 183 packet->set_channel_id(send_id_); | 184 packet->set_channel_id(send_id_); |
| 184 if (!id_sent_) { | 185 if (!id_sent_) { |
| 185 packet->set_channel_name(name_); | 186 packet->set_channel_name(name_); |
| 186 id_sent_ = true; | 187 id_sent_ = true; |
| 187 } | 188 } |
| 188 multiplexer_->DoWrite(std::move(packet), done_task); | 189 multiplexer_->DoWrite(std::move(packet), done_task); |
| 189 } | 190 } |
| 190 | 191 |
| 191 int ChannelMultiplexer::MuxChannel::DoRead( | 192 int ChannelMultiplexer::MuxChannel::DoRead( |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 239 | 240 |
| 240 int ChannelMultiplexer::MuxSocket::Write( | 241 int ChannelMultiplexer::MuxSocket::Write( |
| 241 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, | 242 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
| 242 const net::CompletionCallback& callback) { | 243 const net::CompletionCallback& callback) { |
| 243 DCHECK(CalledOnValidThread()); | 244 DCHECK(CalledOnValidThread()); |
| 244 DCHECK(write_callback_.is_null()); | 245 DCHECK(write_callback_.is_null()); |
| 245 | 246 |
| 246 if (base_channel_error_ != net::OK) | 247 if (base_channel_error_ != net::OK) |
| 247 return base_channel_error_; | 248 return base_channel_error_; |
| 248 | 249 |
| 249 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); | 250 std::unique_ptr<MultiplexPacket> packet(new MultiplexPacket()); |
| 250 size_t size = std::min(kMaxPacketSize, buffer_len); | 251 size_t size = std::min(kMaxPacketSize, buffer_len); |
| 251 packet->mutable_data()->assign(buffer->data(), size); | 252 packet->mutable_data()->assign(buffer->data(), size); |
| 252 | 253 |
| 253 write_pending_ = true; | 254 write_pending_ = true; |
| 254 channel_->DoWrite(std::move(packet), base::Bind( | 255 channel_->DoWrite(std::move(packet), base::Bind( |
| 255 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); | 256 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); |
| 256 | 257 |
| 257 // OnWriteComplete() might be called above synchronously. | 258 // OnWriteComplete() might be called above synchronously. |
| 258 if (write_pending_) { | 259 if (write_pending_) { |
| 259 DCHECK(write_callback_.is_null()); | 260 DCHECK(write_callback_.is_null()); |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 343 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); | 344 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); |
| 344 it != pending_channels_.end(); ++it) { | 345 it != pending_channels_.end(); ++it) { |
| 345 if (it->name == name) { | 346 if (it->name == name) { |
| 346 pending_channels_.erase(it); | 347 pending_channels_.erase(it); |
| 347 return; | 348 return; |
| 348 } | 349 } |
| 349 } | 350 } |
| 350 } | 351 } |
| 351 | 352 |
| 352 void ChannelMultiplexer::OnBaseChannelReady( | 353 void ChannelMultiplexer::OnBaseChannelReady( |
| 353 scoped_ptr<P2PStreamSocket> socket) { | 354 std::unique_ptr<P2PStreamSocket> socket) { |
| 354 base_channel_factory_ = nullptr; | 355 base_channel_factory_ = nullptr; |
| 355 base_channel_ = std::move(socket); | 356 base_channel_ = std::move(socket); |
| 356 | 357 |
| 357 if (base_channel_.get()) { | 358 if (base_channel_.get()) { |
| 358 // Initialize reader and writer. | 359 // Initialize reader and writer. |
| 359 reader_.StartReading(base_channel_.get(), | 360 reader_.StartReading(base_channel_.get(), |
| 360 base::Bind(&ChannelMultiplexer::OnIncomingPacket, | 361 base::Bind(&ChannelMultiplexer::OnIncomingPacket, |
| 361 base::Unretained(this)), | 362 base::Unretained(this)), |
| 362 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 363 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
| 363 base::Unretained(this))); | 364 base::Unretained(this))); |
| (...skipping 13 matching lines...) Expand all Loading... |
| 377 // Every time this function is called it connects a single channel and posts a | 378 // Every time this function is called it connects a single channel and posts a |
| 378 // separate task to connect other channels. This is necessary because the | 379 // separate task to connect other channels. This is necessary because the |
| 379 // callback may destroy the multiplexer or somehow else modify | 380 // callback may destroy the multiplexer or somehow else modify |
| 380 // |pending_channels_| list (e.g. call CancelChannelCreation()). | 381 // |pending_channels_| list (e.g. call CancelChannelCreation()). |
| 381 base::ThreadTaskRunnerHandle::Get()->PostTask( | 382 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 382 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, | 383 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, |
| 383 weak_factory_.GetWeakPtr())); | 384 weak_factory_.GetWeakPtr())); |
| 384 | 385 |
| 385 PendingChannel c = pending_channels_.front(); | 386 PendingChannel c = pending_channels_.front(); |
| 386 pending_channels_.erase(pending_channels_.begin()); | 387 pending_channels_.erase(pending_channels_.begin()); |
| 387 scoped_ptr<P2PStreamSocket> socket; | 388 std::unique_ptr<P2PStreamSocket> socket; |
| 388 if (base_channel_.get()) | 389 if (base_channel_.get()) |
| 389 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 390 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
| 390 c.callback.Run(std::move(socket)); | 391 c.callback.Run(std::move(socket)); |
| 391 } | 392 } |
| 392 | 393 |
| 393 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 394 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
| 394 const std::string& name) { | 395 const std::string& name) { |
| 395 // Check if we already have a channel with the requested name. | 396 // Check if we already have a channel with the requested name. |
| 396 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 397 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
| 397 if (it != channels_.end()) | 398 if (it != channels_.end()) |
| (...skipping 17 matching lines...) Expand all Loading... |
| 415 } | 416 } |
| 416 } | 417 } |
| 417 | 418 |
| 418 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, | 419 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, |
| 419 int error) { | 420 int error) { |
| 420 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 421 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
| 421 if (it != channels_.end()) | 422 if (it != channels_.end()) |
| 422 it->second->OnBaseChannelError(error); | 423 it->second->OnBaseChannelError(error); |
| 423 } | 424 } |
| 424 | 425 |
| 425 void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<CompoundBuffer> buffer) { | 426 void ChannelMultiplexer::OnIncomingPacket( |
| 426 scoped_ptr<MultiplexPacket> packet = | 427 std::unique_ptr<CompoundBuffer> buffer) { |
| 428 std::unique_ptr<MultiplexPacket> packet = |
| 427 ParseMessage<MultiplexPacket>(buffer.get()); | 429 ParseMessage<MultiplexPacket>(buffer.get()); |
| 428 if (!packet) | 430 if (!packet) |
| 429 return; | 431 return; |
| 430 | 432 |
| 431 DCHECK(packet->has_channel_id()); | 433 DCHECK(packet->has_channel_id()); |
| 432 if (!packet->has_channel_id()) { | 434 if (!packet->has_channel_id()) { |
| 433 LOG(ERROR) << "Received packet without channel_id."; | 435 LOG(ERROR) << "Received packet without channel_id."; |
| 434 return; | 436 return; |
| 435 } | 437 } |
| 436 | 438 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 448 return; | 450 return; |
| 449 } | 451 } |
| 450 channel = GetOrCreateChannel(packet->channel_name()); | 452 channel = GetOrCreateChannel(packet->channel_name()); |
| 451 channel->set_receive_id(receive_id); | 453 channel->set_receive_id(receive_id); |
| 452 channels_by_receive_id_[receive_id] = channel; | 454 channels_by_receive_id_[receive_id] = channel; |
| 453 } | 455 } |
| 454 | 456 |
| 455 channel->OnIncomingPacket(std::move(packet)); | 457 channel->OnIncomingPacket(std::move(packet)); |
| 456 } | 458 } |
| 457 | 459 |
| 458 void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, | 460 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, |
| 459 const base::Closure& done_task) { | 461 const base::Closure& done_task) { |
| 460 writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 462 writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
| 461 } | 463 } |
| 462 | 464 |
| 463 } // namespace protocol | 465 } // namespace protocol |
| 464 } // namespace remoting | 466 } // namespace remoting |
| OLD | NEW |