Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/callback.h" | 10 #include "base/callback.h" |
| 11 #include "base/location.h" | 11 #include "base/location.h" |
| 12 #include "base/single_thread_task_runner.h" | |
| 12 #include "base/stl_util.h" | 13 #include "base/stl_util.h" |
| 14 #include "base/thread_task_runner_handle.h" | |
| 13 #include "net/base/net_errors.h" | 15 #include "net/base/net_errors.h" |
| 14 #include "net/socket/stream_socket.h" | 16 #include "net/socket/stream_socket.h" |
| 15 #include "remoting/protocol/util.h" | 17 #include "remoting/protocol/util.h" |
| 16 | 18 |
| 17 namespace remoting { | 19 namespace remoting { |
| 18 namespace protocol { | 20 namespace protocol { |
| 19 | 21 |
| 20 namespace { | 22 namespace { |
| 21 const int kChannelIdUnknown = -1; | 23 const int kChannelIdUnknown = -1; |
| 22 const int kMaxPacketSize = 1024; | 24 const int kMaxPacketSize = 1024; |
| (...skipping 334 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 357 std::swap(cb, read_callback_); | 359 std::swap(cb, read_callback_); |
| 358 cb.Run(result); | 360 cb.Run(result); |
| 359 } | 361 } |
| 360 } | 362 } |
| 361 | 363 |
| 362 ChannelMultiplexer::ChannelMultiplexer(ChannelFactory* factory, | 364 ChannelMultiplexer::ChannelMultiplexer(ChannelFactory* factory, |
| 363 const std::string& base_channel_name) | 365 const std::string& base_channel_name) |
| 364 : base_channel_factory_(factory), | 366 : base_channel_factory_(factory), |
| 365 base_channel_name_(base_channel_name), | 367 base_channel_name_(base_channel_name), |
| 366 next_channel_id_(0), | 368 next_channel_id_(0), |
| 367 destroyed_flag_(NULL) { | 369 ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)) { |
| 368 } | 370 } |
| 369 | 371 |
| 370 ChannelMultiplexer::~ChannelMultiplexer() { | 372 ChannelMultiplexer::~ChannelMultiplexer() { |
| 371 DCHECK(pending_channels_.empty()); | 373 DCHECK(pending_channels_.empty()); |
| 372 STLDeleteValues(&channels_); | 374 STLDeleteValues(&channels_); |
| 373 | 375 |
| 374 // Cancel creation of the base channel if it hasn't finished. | 376 // Cancel creation of the base channel if it hasn't finished. |
| 375 if (base_channel_factory_) | 377 if (base_channel_factory_) |
| 376 base_channel_factory_->CancelChannelCreation(base_channel_name_); | 378 base_channel_factory_->CancelChannelCreation(base_channel_name_); |
| 377 | |
| 378 if (destroyed_flag_) | |
| 379 *destroyed_flag_ = true; | |
| 380 } | 379 } |
| 381 | 380 |
| 382 void ChannelMultiplexer::CreateStreamChannel( | 381 void ChannelMultiplexer::CreateStreamChannel( |
| 383 const std::string& name, | 382 const std::string& name, |
| 384 const StreamChannelCallback& callback) { | 383 const StreamChannelCallback& callback) { |
| 385 if (base_channel_.get()) { | 384 if (base_channel_.get()) { |
| 386 // Already have |base_channel_|. Create new multiplexed channel | 385 // Already have |base_channel_|. Create new multiplexed channel |
| 387 // synchronously. | 386 // synchronously. |
| 388 callback.Run(GetOrCreateChannel(name)->CreateSocket()); | 387 callback.Run(GetOrCreateChannel(name)->CreateSocket()); |
| 389 } else if (!base_channel_.get() && !base_channel_factory_) { | 388 } else if (!base_channel_.get() && !base_channel_factory_) { |
| (...skipping 28 matching lines...) Expand all Loading... | |
| 418 return; | 417 return; |
| 419 } | 418 } |
| 420 } | 419 } |
| 421 } | 420 } |
| 422 | 421 |
| 423 void ChannelMultiplexer::OnBaseChannelReady( | 422 void ChannelMultiplexer::OnBaseChannelReady( |
| 424 scoped_ptr<net::StreamSocket> socket) { | 423 scoped_ptr<net::StreamSocket> socket) { |
| 425 base_channel_factory_ = NULL; | 424 base_channel_factory_ = NULL; |
| 426 base_channel_ = socket.Pass(); | 425 base_channel_ = socket.Pass(); |
| 427 | 426 |
| 428 if (!base_channel_.get()) { | 427 if (base_channel_.get()) { |
| 429 // Notify all callers that we can't create any channels. | 428 // Initialize reader and writer. |
| 430 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); | 429 reader_.Init(base_channel_.get(), |
| 431 it != pending_channels_.end(); ++it) { | 430 base::Bind(&ChannelMultiplexer::OnIncomingPacket, |
| 432 it->callback.Run(scoped_ptr<net::StreamSocket>()); | 431 base::Unretained(this))); |
| 433 } | 432 writer_.Init(base_channel_.get(), |
| 434 pending_channels_.clear(); | 433 base::Bind(&ChannelMultiplexer::OnWriteFailed, |
| 435 return; | 434 base::Unretained(this))); |
| 436 } | 435 } |
| 437 | 436 |
| 438 // Initialize reader and writer. | 437 DoCreatePendingChannels(); |
| 439 reader_.Init(base_channel_.get(), | 438 } |
| 440 base::Bind(&ChannelMultiplexer::OnIncomingPacket, | |
| 441 base::Unretained(this))); | |
| 442 writer_.Init(base_channel_.get(), | |
| 443 base::Bind(&ChannelMultiplexer::OnWriteFailed, | |
| 444 base::Unretained(this))); | |
| 445 | 439 |
| 446 // Now create all pending channels. | 440 void ChannelMultiplexer::DoCreatePendingChannels() { |
| 447 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); | 441 if (pending_channels_.empty()) |
| 448 it != pending_channels_.end(); ++it) { | 442 return; |
| 449 it->callback.Run(GetOrCreateChannel(it->name)->CreateSocket()); | 443 |
| 450 } | 444 // Every time this function is called it connects a single channel and posts a |
|
Wez
2012/09/25 23:34:15
typo: Remove the "a"
Sergey Ulanov
2012/09/26 00:46:45
replaced "tasks" with "task" instead.
| |
| 451 pending_channels_.clear(); | 445 // separate tasks to connect other channels. This is necessary because the |
| 446 // callback may destroy the multiplexer or somehow else modify | |
| 447 // |pending_channels_| list (e.g. call CancelChannelCreation()). | |
| 448 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 449 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, | |
| 450 weak_factory_.GetWeakPtr())); | |
| 451 | |
| 452 PendingChannel c = pending_channels_.front(); | |
| 453 pending_channels_.erase(pending_channels_.begin()); | |
| 454 scoped_ptr<net::StreamSocket> socket; | |
| 455 if (base_channel_.get()) | |
| 456 socket = GetOrCreateChannel(c.name)->CreateSocket(); | |
| 457 c.callback.Run(socket.Pass()); | |
| 452 } | 458 } |
| 453 | 459 |
| 460 | |
| 454 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 461 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
| 455 const std::string& name) { | 462 const std::string& name) { |
| 456 // Check if we already have a channel with the requested name. | 463 // Check if we already have a channel with the requested name. |
| 457 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 464 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
| 458 if (it != channels_.end()) | 465 if (it != channels_.end()) |
| 459 return it->second; | 466 return it->second; |
| 460 | 467 |
| 461 // Create a new channel if we haven't found existing one. | 468 // Create a new channel if we haven't found existing one. |
| 462 MuxChannel* channel = new MuxChannel(this, name, next_channel_id_); | 469 MuxChannel* channel = new MuxChannel(this, name, next_channel_id_); |
| 463 ++next_channel_id_; | 470 ++next_channel_id_; |
| 464 channels_[channel->name()] = channel; | 471 channels_[channel->name()] = channel; |
| 465 return channel; | 472 return channel; |
| 466 } | 473 } |
| 467 | 474 |
| 468 | 475 |
| 469 void ChannelMultiplexer::OnWriteFailed(int error) { | 476 void ChannelMultiplexer::OnWriteFailed(int error) { |
| 470 bool destroyed = false; | |
| 471 destroyed_flag_ = &destroyed; | |
| 472 for (std::map<std::string, MuxChannel*>::iterator it = channels_.begin(); | 477 for (std::map<std::string, MuxChannel*>::iterator it = channels_.begin(); |
| 473 it != channels_.end(); ++it) { | 478 it != channels_.end(); ++it) { |
| 479 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 480 FROM_HERE, base::Bind(&ChannelMultiplexer::NotifyWriteFailed, | |
| 481 weak_factory_.GetWeakPtr(), it->second->name())); | |
| 482 } | |
| 483 } | |
| 484 | |
| 485 void ChannelMultiplexer::NotifyWriteFailed(const std::string& name) { | |
| 486 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | |
| 487 if (it != channels_.end()) { | |
| 474 it->second->OnWriteFailed(); | 488 it->second->OnWriteFailed(); |
| 475 if (destroyed) | |
| 476 return; | |
| 477 } | 489 } |
| 478 destroyed_flag_ = NULL; | |
| 479 } | 490 } |
| 480 | 491 |
| 481 void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, | 492 void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, |
| 482 const base::Closure& done_task) { | 493 const base::Closure& done_task) { |
| 483 if (!packet->has_channel_id()) { | 494 if (!packet->has_channel_id()) { |
| 484 LOG(ERROR) << "Received packet without channel_id."; | 495 LOG(ERROR) << "Received packet without channel_id."; |
| 485 done_task.Run(); | 496 done_task.Run(); |
| 486 return; | 497 return; |
| 487 } | 498 } |
| 488 | 499 |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 508 channel->OnIncomingPacket(packet.Pass(), done_task); | 519 channel->OnIncomingPacket(packet.Pass(), done_task); |
| 509 } | 520 } |
| 510 | 521 |
| 511 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, | 522 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, |
| 512 const base::Closure& done_task) { | 523 const base::Closure& done_task) { |
| 513 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 524 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
| 514 } | 525 } |
| 515 | 526 |
| 516 } // namespace protocol | 527 } // namespace protocol |
| 517 } // namespace remoting | 528 } // namespace remoting |
| OLD | NEW |