| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/callback.h" | 10 #include "base/callback.h" |
| 11 #include "base/callback_helpers.h" | 11 #include "base/callback_helpers.h" |
| 12 #include "base/location.h" | 12 #include "base/location.h" |
| 13 #include "base/single_thread_task_runner.h" | 13 #include "base/single_thread_task_runner.h" |
| 14 #include "base/stl_util.h" | 14 #include "base/stl_util.h" |
| 15 #include "base/thread_task_runner_handle.h" | 15 #include "base/thread_task_runner_handle.h" |
| 16 #include "net/base/net_errors.h" | 16 #include "net/base/net_errors.h" |
| 17 #include "net/socket/stream_socket.h" | |
| 18 #include "remoting/protocol/message_serialization.h" | 17 #include "remoting/protocol/message_serialization.h" |
| 18 #include "remoting/protocol/p2p_stream_socket.h" |
| 19 | 19 |
| 20 namespace remoting { | 20 namespace remoting { |
| 21 namespace protocol { | 21 namespace protocol { |
| 22 | 22 |
| 23 namespace { | 23 namespace { |
| 24 const int kChannelIdUnknown = -1; | 24 const int kChannelIdUnknown = -1; |
| 25 const int kMaxPacketSize = 1024; | 25 const int kMaxPacketSize = 1024; |
| 26 | 26 |
| 27 class PendingPacket { | 27 class PendingPacket { |
| 28 public: | 28 public: |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 70 public: | 70 public: |
| 71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, | 71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, |
| 72 int send_id); | 72 int send_id); |
| 73 ~MuxChannel(); | 73 ~MuxChannel(); |
| 74 | 74 |
| 75 const std::string& name() { return name_; } | 75 const std::string& name() { return name_; } |
| 76 int receive_id() { return receive_id_; } | 76 int receive_id() { return receive_id_; } |
| 77 void set_receive_id(int id) { receive_id_ = id; } | 77 void set_receive_id(int id) { receive_id_ = id; } |
| 78 | 78 |
| 79 // Called by ChannelMultiplexer. | 79 // Called by ChannelMultiplexer. |
| 80 scoped_ptr<net::StreamSocket> CreateSocket(); | 80 scoped_ptr<P2PStreamSocket> CreateSocket(); |
| 81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, | 81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, |
| 82 const base::Closure& done_task); | 82 const base::Closure& done_task); |
| 83 void OnBaseChannelError(int error); | 83 void OnBaseChannelError(int error); |
| 84 | 84 |
| 85 // Called by MuxSocket. | 85 // Called by MuxSocket. |
| 86 void OnSocketDestroyed(); | 86 void OnSocketDestroyed(); |
| 87 bool DoWrite(scoped_ptr<MultiplexPacket> packet, | 87 bool DoWrite(scoped_ptr<MultiplexPacket> packet, |
| 88 const base::Closure& done_task); | 88 const base::Closure& done_task); |
| 89 int DoRead(net::IOBuffer* buffer, int buffer_len); | 89 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); |
| 90 | 90 |
| 91 private: | 91 private: |
| 92 ChannelMultiplexer* multiplexer_; | 92 ChannelMultiplexer* multiplexer_; |
| 93 std::string name_; | 93 std::string name_; |
| 94 int send_id_; | 94 int send_id_; |
| 95 bool id_sent_; | 95 bool id_sent_; |
| 96 int receive_id_; | 96 int receive_id_; |
| 97 MuxSocket* socket_; | 97 MuxSocket* socket_; |
| 98 std::list<PendingPacket*> pending_packets_; | 98 std::list<PendingPacket*> pending_packets_; |
| 99 | 99 |
| 100 DISALLOW_COPY_AND_ASSIGN(MuxChannel); | 100 DISALLOW_COPY_AND_ASSIGN(MuxChannel); |
| 101 }; | 101 }; |
| 102 | 102 |
| 103 class ChannelMultiplexer::MuxSocket : public net::StreamSocket, | 103 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, |
| 104 public base::NonThreadSafe, | 104 public base::NonThreadSafe, |
| 105 public base::SupportsWeakPtr<MuxSocket> { | 105 public base::SupportsWeakPtr<MuxSocket> { |
| 106 public: | 106 public: |
| 107 MuxSocket(MuxChannel* channel); | 107 MuxSocket(MuxChannel* channel); |
| 108 ~MuxSocket() override; | 108 ~MuxSocket() override; |
| 109 | 109 |
| 110 void OnWriteComplete(); | 110 void OnWriteComplete(); |
| 111 void OnBaseChannelError(int error); | 111 void OnBaseChannelError(int error); |
| 112 void OnPacketReceived(); | 112 void OnPacketReceived(); |
| 113 | 113 |
| 114 // net::StreamSocket interface. | 114 // P2PStreamSocket interface. |
| 115 int Read(net::IOBuffer* buffer, | 115 int Read(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
| 116 int buffer_len, | |
| 117 const net::CompletionCallback& callback) override; | 116 const net::CompletionCallback& callback) override; |
| 118 int Write(net::IOBuffer* buffer, | 117 int Write(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
| 119 int buffer_len, | |
| 120 const net::CompletionCallback& callback) override; | 118 const net::CompletionCallback& callback) override; |
| 121 | 119 |
| 122 int SetReceiveBufferSize(int32 size) override { | |
| 123 NOTIMPLEMENTED(); | |
| 124 return net::ERR_NOT_IMPLEMENTED; | |
| 125 } | |
| 126 int SetSendBufferSize(int32 size) override { | |
| 127 NOTIMPLEMENTED(); | |
| 128 return net::ERR_NOT_IMPLEMENTED; | |
| 129 } | |
| 130 | |
| 131 int Connect(const net::CompletionCallback& callback) override { | |
| 132 NOTIMPLEMENTED(); | |
| 133 return net::ERR_NOT_IMPLEMENTED; | |
| 134 } | |
| 135 void Disconnect() override { NOTIMPLEMENTED(); } | |
| 136 bool IsConnected() const override { | |
| 137 NOTIMPLEMENTED(); | |
| 138 return true; | |
| 139 } | |
| 140 bool IsConnectedAndIdle() const override { | |
| 141 NOTIMPLEMENTED(); | |
| 142 return false; | |
| 143 } | |
| 144 int GetPeerAddress(net::IPEndPoint* address) const override { | |
| 145 NOTIMPLEMENTED(); | |
| 146 return net::ERR_NOT_IMPLEMENTED; | |
| 147 } | |
| 148 int GetLocalAddress(net::IPEndPoint* address) const override { | |
| 149 NOTIMPLEMENTED(); | |
| 150 return net::ERR_NOT_IMPLEMENTED; | |
| 151 } | |
| 152 const net::BoundNetLog& NetLog() const override { | |
| 153 NOTIMPLEMENTED(); | |
| 154 return net_log_; | |
| 155 } | |
| 156 void SetSubresourceSpeculation() override { NOTIMPLEMENTED(); } | |
| 157 void SetOmniboxSpeculation() override { NOTIMPLEMENTED(); } | |
| 158 bool WasEverUsed() const override { return true; } | |
| 159 bool UsingTCPFastOpen() const override { return false; } | |
| 160 bool WasNpnNegotiated() const override { return false; } | |
| 161 net::NextProto GetNegotiatedProtocol() const override { | |
| 162 return net::kProtoUnknown; | |
| 163 } | |
| 164 bool GetSSLInfo(net::SSLInfo* ssl_info) override { | |
| 165 NOTIMPLEMENTED(); | |
| 166 return false; | |
| 167 } | |
| 168 void GetConnectionAttempts(net::ConnectionAttempts* out) const override { | |
| 169 out->clear(); | |
| 170 } | |
| 171 void ClearConnectionAttempts() override {} | |
| 172 void AddConnectionAttempts(const net::ConnectionAttempts& attempts) override { | |
| 173 } | |
| 174 | |
| 175 private: | 120 private: |
| 176 MuxChannel* channel_; | 121 MuxChannel* channel_; |
| 177 | 122 |
| 178 int base_channel_error_ = net::OK; | 123 int base_channel_error_ = net::OK; |
| 179 | 124 |
| 180 net::CompletionCallback read_callback_; | 125 net::CompletionCallback read_callback_; |
| 181 scoped_refptr<net::IOBuffer> read_buffer_; | 126 scoped_refptr<net::IOBuffer> read_buffer_; |
| 182 int read_buffer_size_; | 127 int read_buffer_size_; |
| 183 | 128 |
| 184 bool write_pending_; | 129 bool write_pending_; |
| 185 int write_result_; | 130 int write_result_; |
| 186 net::CompletionCallback write_callback_; | 131 net::CompletionCallback write_callback_; |
| 187 | 132 |
| 188 net::BoundNetLog net_log_; | |
| 189 | |
| 190 DISALLOW_COPY_AND_ASSIGN(MuxSocket); | 133 DISALLOW_COPY_AND_ASSIGN(MuxSocket); |
| 191 }; | 134 }; |
| 192 | 135 |
| 193 | 136 |
| 194 ChannelMultiplexer::MuxChannel::MuxChannel( | 137 ChannelMultiplexer::MuxChannel::MuxChannel( |
| 195 ChannelMultiplexer* multiplexer, | 138 ChannelMultiplexer* multiplexer, |
| 196 const std::string& name, | 139 const std::string& name, |
| 197 int send_id) | 140 int send_id) |
| 198 : multiplexer_(multiplexer), | 141 : multiplexer_(multiplexer), |
| 199 name_(name), | 142 name_(name), |
| 200 send_id_(send_id), | 143 send_id_(send_id), |
| 201 id_sent_(false), | 144 id_sent_(false), |
| 202 receive_id_(kChannelIdUnknown), | 145 receive_id_(kChannelIdUnknown), |
| 203 socket_(nullptr) { | 146 socket_(nullptr) { |
| 204 } | 147 } |
| 205 | 148 |
| 206 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 149 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
| 207 // Socket must be destroyed before the channel. | 150 // Socket must be destroyed before the channel. |
| 208 DCHECK(!socket_); | 151 DCHECK(!socket_); |
| 209 STLDeleteElements(&pending_packets_); | 152 STLDeleteElements(&pending_packets_); |
| 210 } | 153 } |
| 211 | 154 |
| 212 scoped_ptr<net::StreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { | 155 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { |
| 213 DCHECK(!socket_); // Can't create more than one socket per channel. | 156 DCHECK(!socket_); // Can't create more than one socket per channel. |
| 214 scoped_ptr<MuxSocket> result(new MuxSocket(this)); | 157 scoped_ptr<MuxSocket> result(new MuxSocket(this)); |
| 215 socket_ = result.get(); | 158 socket_ = result.get(); |
| 216 return result.Pass(); | 159 return result.Pass(); |
| 217 } | 160 } |
| 218 | 161 |
| 219 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 162 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
| 220 scoped_ptr<MultiplexPacket> packet, | 163 scoped_ptr<MultiplexPacket> packet, |
| 221 const base::Closure& done_task) { | 164 const base::Closure& done_task) { |
| 222 DCHECK_EQ(packet->channel_id(), receive_id_); | 165 DCHECK_EQ(packet->channel_id(), receive_id_); |
| (...skipping 20 matching lines...) Expand all Loading... |
| 243 scoped_ptr<MultiplexPacket> packet, | 186 scoped_ptr<MultiplexPacket> packet, |
| 244 const base::Closure& done_task) { | 187 const base::Closure& done_task) { |
| 245 packet->set_channel_id(send_id_); | 188 packet->set_channel_id(send_id_); |
| 246 if (!id_sent_) { | 189 if (!id_sent_) { |
| 247 packet->set_channel_name(name_); | 190 packet->set_channel_name(name_); |
| 248 id_sent_ = true; | 191 id_sent_ = true; |
| 249 } | 192 } |
| 250 return multiplexer_->DoWrite(packet.Pass(), done_task); | 193 return multiplexer_->DoWrite(packet.Pass(), done_task); |
| 251 } | 194 } |
| 252 | 195 |
| 253 int ChannelMultiplexer::MuxChannel::DoRead(net::IOBuffer* buffer, | 196 int ChannelMultiplexer::MuxChannel::DoRead( |
| 254 int buffer_len) { | 197 const scoped_refptr<net::IOBuffer>& buffer, |
| 198 int buffer_len) { |
| 255 int pos = 0; | 199 int pos = 0; |
| 256 while (buffer_len > 0 && !pending_packets_.empty()) { | 200 while (buffer_len > 0 && !pending_packets_.empty()) { |
| 257 DCHECK(!pending_packets_.front()->is_empty()); | 201 DCHECK(!pending_packets_.front()->is_empty()); |
| 258 int result = pending_packets_.front()->Read( | 202 int result = pending_packets_.front()->Read( |
| 259 buffer->data() + pos, buffer_len); | 203 buffer->data() + pos, buffer_len); |
| 260 DCHECK_LE(result, buffer_len); | 204 DCHECK_LE(result, buffer_len); |
| 261 pos += result; | 205 pos += result; |
| 262 buffer_len -= pos; | 206 buffer_len -= pos; |
| 263 if (pending_packets_.front()->is_empty()) { | 207 if (pending_packets_.front()->is_empty()) { |
| 264 delete pending_packets_.front(); | 208 delete pending_packets_.front(); |
| 265 pending_packets_.erase(pending_packets_.begin()); | 209 pending_packets_.erase(pending_packets_.begin()); |
| 266 } | 210 } |
| 267 } | 211 } |
| 268 return pos; | 212 return pos; |
| 269 } | 213 } |
| 270 | 214 |
| 271 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel) | 215 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel) |
| 272 : channel_(channel), | 216 : channel_(channel), |
| 273 read_buffer_size_(0), | 217 read_buffer_size_(0), |
| 274 write_pending_(false), | 218 write_pending_(false), |
| 275 write_result_(0) { | 219 write_result_(0) { |
| 276 } | 220 } |
| 277 | 221 |
| 278 ChannelMultiplexer::MuxSocket::~MuxSocket() { | 222 ChannelMultiplexer::MuxSocket::~MuxSocket() { |
| 279 channel_->OnSocketDestroyed(); | 223 channel_->OnSocketDestroyed(); |
| 280 } | 224 } |
| 281 | 225 |
| 282 int ChannelMultiplexer::MuxSocket::Read( | 226 int ChannelMultiplexer::MuxSocket::Read( |
| 283 net::IOBuffer* buffer, int buffer_len, | 227 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
| 284 const net::CompletionCallback& callback) { | 228 const net::CompletionCallback& callback) { |
| 285 DCHECK(CalledOnValidThread()); | 229 DCHECK(CalledOnValidThread()); |
| 286 DCHECK(read_callback_.is_null()); | 230 DCHECK(read_callback_.is_null()); |
| 287 | 231 |
| 288 if (base_channel_error_ != net::OK) | 232 if (base_channel_error_ != net::OK) |
| 289 return base_channel_error_; | 233 return base_channel_error_; |
| 290 | 234 |
| 291 int result = channel_->DoRead(buffer, buffer_len); | 235 int result = channel_->DoRead(buffer, buffer_len); |
| 292 if (result == 0) { | 236 if (result == 0) { |
| 293 read_buffer_ = buffer; | 237 read_buffer_ = buffer; |
| 294 read_buffer_size_ = buffer_len; | 238 read_buffer_size_ = buffer_len; |
| 295 read_callback_ = callback; | 239 read_callback_ = callback; |
| 296 return net::ERR_IO_PENDING; | 240 return net::ERR_IO_PENDING; |
| 297 } | 241 } |
| 298 return result; | 242 return result; |
| 299 } | 243 } |
| 300 | 244 |
| 301 int ChannelMultiplexer::MuxSocket::Write( | 245 int ChannelMultiplexer::MuxSocket::Write( |
| 302 net::IOBuffer* buffer, int buffer_len, | 246 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
| 303 const net::CompletionCallback& callback) { | 247 const net::CompletionCallback& callback) { |
| 304 DCHECK(CalledOnValidThread()); | 248 DCHECK(CalledOnValidThread()); |
| 305 DCHECK(write_callback_.is_null()); | 249 DCHECK(write_callback_.is_null()); |
| 306 | 250 |
| 307 if (base_channel_error_ != net::OK) | 251 if (base_channel_error_ != net::OK) |
| 308 return base_channel_error_; | 252 return base_channel_error_; |
| 309 | 253 |
| 310 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); | 254 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); |
| 311 size_t size = std::min(kMaxPacketSize, buffer_len); | 255 size_t size = std::min(kMaxPacketSize, buffer_len); |
| 312 packet->mutable_data()->assign(buffer->data(), size); | 256 packet->mutable_data()->assign(buffer->data(), size); |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 413 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); | 357 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); |
| 414 it != pending_channels_.end(); ++it) { | 358 it != pending_channels_.end(); ++it) { |
| 415 if (it->name == name) { | 359 if (it->name == name) { |
| 416 pending_channels_.erase(it); | 360 pending_channels_.erase(it); |
| 417 return; | 361 return; |
| 418 } | 362 } |
| 419 } | 363 } |
| 420 } | 364 } |
| 421 | 365 |
| 422 void ChannelMultiplexer::OnBaseChannelReady( | 366 void ChannelMultiplexer::OnBaseChannelReady( |
| 423 scoped_ptr<net::StreamSocket> socket) { | 367 scoped_ptr<P2PStreamSocket> socket) { |
| 424 base_channel_factory_ = nullptr; | 368 base_channel_factory_ = nullptr; |
| 425 base_channel_ = socket.Pass(); | 369 base_channel_ = socket.Pass(); |
| 426 | 370 |
| 427 if (base_channel_.get()) { | 371 if (base_channel_.get()) { |
| 428 // Initialize reader and writer. | 372 // Initialize reader and writer. |
| 429 reader_.StartReading(base_channel_.get(), | 373 reader_.StartReading(base_channel_.get(), |
| 430 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 374 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
| 431 base::Unretained(this))); | 375 base::Unretained(this))); |
| 432 writer_.Init(base_channel_.get(), | 376 writer_.Init(base::Bind(&P2PStreamSocket::Write, |
| 377 base::Unretained(base_channel_.get())), |
| 433 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 378 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
| 434 base::Unretained(this))); | 379 base::Unretained(this))); |
| 435 } | 380 } |
| 436 | 381 |
| 437 DoCreatePendingChannels(); | 382 DoCreatePendingChannels(); |
| 438 } | 383 } |
| 439 | 384 |
| 440 void ChannelMultiplexer::DoCreatePendingChannels() { | 385 void ChannelMultiplexer::DoCreatePendingChannels() { |
| 441 if (pending_channels_.empty()) | 386 if (pending_channels_.empty()) |
| 442 return; | 387 return; |
| 443 | 388 |
| 444 // Every time this function is called it connects a single channel and posts a | 389 // Every time this function is called it connects a single channel and posts a |
| 445 // separate task to connect other channels. This is necessary because the | 390 // separate task to connect other channels. This is necessary because the |
| 446 // callback may destroy the multiplexer or somehow else modify | 391 // callback may destroy the multiplexer or somehow else modify |
| 447 // |pending_channels_| list (e.g. call CancelChannelCreation()). | 392 // |pending_channels_| list (e.g. call CancelChannelCreation()). |
| 448 base::ThreadTaskRunnerHandle::Get()->PostTask( | 393 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 449 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, | 394 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, |
| 450 weak_factory_.GetWeakPtr())); | 395 weak_factory_.GetWeakPtr())); |
| 451 | 396 |
| 452 PendingChannel c = pending_channels_.front(); | 397 PendingChannel c = pending_channels_.front(); |
| 453 pending_channels_.erase(pending_channels_.begin()); | 398 pending_channels_.erase(pending_channels_.begin()); |
| 454 scoped_ptr<net::StreamSocket> socket; | 399 scoped_ptr<P2PStreamSocket> socket; |
| 455 if (base_channel_.get()) | 400 if (base_channel_.get()) |
| 456 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 401 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
| 457 c.callback.Run(socket.Pass()); | 402 c.callback.Run(socket.Pass()); |
| 458 } | 403 } |
| 459 | 404 |
| 460 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 405 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
| 461 const std::string& name) { | 406 const std::string& name) { |
| 462 // Check if we already have a channel with the requested name. | 407 // Check if we already have a channel with the requested name. |
| 463 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 408 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
| 464 if (it != channels_.end()) | 409 if (it != channels_.end()) |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 520 channel->OnIncomingPacket(packet.Pass(), done_task); | 465 channel->OnIncomingPacket(packet.Pass(), done_task); |
| 521 } | 466 } |
| 522 | 467 |
| 523 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, | 468 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, |
| 524 const base::Closure& done_task) { | 469 const base::Closure& done_task) { |
| 525 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 470 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
| 526 } | 471 } |
| 527 | 472 |
| 528 } // namespace protocol | 473 } // namespace protocol |
| 529 } // namespace remoting | 474 } // namespace remoting |
| OLD | NEW |