OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <string.h> | 8 #include <string.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/callback.h" | 13 #include "base/callback.h" |
14 #include "base/callback_helpers.h" | 14 #include "base/callback_helpers.h" |
15 #include "base/location.h" | 15 #include "base/location.h" |
16 #include "base/macros.h" | 16 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" |
17 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
18 #include "base/stl_util.h" | |
19 #include "base/threading/thread_task_runner_handle.h" | 19 #include "base/threading/thread_task_runner_handle.h" |
20 #include "net/base/net_errors.h" | 20 #include "net/base/net_errors.h" |
21 #include "remoting/protocol/message_serialization.h" | 21 #include "remoting/protocol/message_serialization.h" |
22 #include "remoting/protocol/p2p_stream_socket.h" | 22 #include "remoting/protocol/p2p_stream_socket.h" |
23 | 23 |
24 namespace remoting { | 24 namespace remoting { |
25 namespace protocol { | 25 namespace protocol { |
26 | 26 |
27 namespace { | 27 namespace { |
28 const int kChannelIdUnknown = -1; | 28 const int kChannelIdUnknown = -1; |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
84 const base::Closure& done_task); | 84 const base::Closure& done_task); |
85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); | 85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); |
86 | 86 |
87 private: | 87 private: |
88 ChannelMultiplexer* multiplexer_; | 88 ChannelMultiplexer* multiplexer_; |
89 std::string name_; | 89 std::string name_; |
90 int send_id_; | 90 int send_id_; |
91 bool id_sent_; | 91 bool id_sent_; |
92 int receive_id_; | 92 int receive_id_; |
93 MuxSocket* socket_; | 93 MuxSocket* socket_; |
94 std::list<PendingPacket*> pending_packets_; | 94 std::list<std::unique_ptr<PendingPacket>> pending_packets_; |
95 | 95 |
96 DISALLOW_COPY_AND_ASSIGN(MuxChannel); | 96 DISALLOW_COPY_AND_ASSIGN(MuxChannel); |
97 }; | 97 }; |
98 | 98 |
99 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, | 99 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, |
100 public base::NonThreadSafe, | 100 public base::NonThreadSafe, |
101 public base::SupportsWeakPtr<MuxSocket> { | 101 public base::SupportsWeakPtr<MuxSocket> { |
102 public: | 102 public: |
103 MuxSocket(MuxChannel* channel); | 103 MuxSocket(MuxChannel* channel); |
104 ~MuxSocket() override; | 104 ~MuxSocket() override; |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
138 name_(name), | 138 name_(name), |
139 send_id_(send_id), | 139 send_id_(send_id), |
140 id_sent_(false), | 140 id_sent_(false), |
141 receive_id_(kChannelIdUnknown), | 141 receive_id_(kChannelIdUnknown), |
142 socket_(nullptr) { | 142 socket_(nullptr) { |
143 } | 143 } |
144 | 144 |
145 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 145 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
146 // Socket must be destroyed before the channel. | 146 // Socket must be destroyed before the channel. |
147 DCHECK(!socket_); | 147 DCHECK(!socket_); |
148 base::STLDeleteElements(&pending_packets_); | |
149 } | 148 } |
150 | 149 |
151 std::unique_ptr<P2PStreamSocket> | 150 std::unique_ptr<P2PStreamSocket> |
152 ChannelMultiplexer::MuxChannel::CreateSocket() { | 151 ChannelMultiplexer::MuxChannel::CreateSocket() { |
153 DCHECK(!socket_); // Can't create more than one socket per channel. | 152 DCHECK(!socket_); // Can't create more than one socket per channel. |
154 std::unique_ptr<MuxSocket> result(new MuxSocket(this)); | 153 std::unique_ptr<MuxSocket> result(new MuxSocket(this)); |
155 socket_ = result.get(); | 154 socket_ = result.get(); |
156 return std::move(result); | 155 return std::move(result); |
157 } | 156 } |
158 | 157 |
159 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 158 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
160 std::unique_ptr<MultiplexPacket> packet) { | 159 std::unique_ptr<MultiplexPacket> packet) { |
161 DCHECK_EQ(packet->channel_id(), receive_id_); | 160 DCHECK_EQ(packet->channel_id(), receive_id_); |
162 if (packet->data().size() > 0) { | 161 if (packet->data().size() > 0) { |
163 pending_packets_.push_back(new PendingPacket(std::move(packet))); | 162 pending_packets_.push_back( |
| 163 base::MakeUnique<PendingPacket>(std::move(packet))); |
164 if (socket_) { | 164 if (socket_) { |
165 // Notify the socket that we have more data. | 165 // Notify the socket that we have more data. |
166 socket_->OnPacketReceived(); | 166 socket_->OnPacketReceived(); |
167 } | 167 } |
168 } | 168 } |
169 } | 169 } |
170 | 170 |
171 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { | 171 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { |
172 if (socket_) | 172 if (socket_) |
173 socket_->OnBaseChannelError(error); | 173 socket_->OnBaseChannelError(error); |
(...skipping 19 matching lines...) Expand all Loading... |
193 const scoped_refptr<net::IOBuffer>& buffer, | 193 const scoped_refptr<net::IOBuffer>& buffer, |
194 int buffer_len) { | 194 int buffer_len) { |
195 int pos = 0; | 195 int pos = 0; |
196 while (buffer_len > 0 && !pending_packets_.empty()) { | 196 while (buffer_len > 0 && !pending_packets_.empty()) { |
197 DCHECK(!pending_packets_.front()->is_empty()); | 197 DCHECK(!pending_packets_.front()->is_empty()); |
198 int result = pending_packets_.front()->Read( | 198 int result = pending_packets_.front()->Read( |
199 buffer->data() + pos, buffer_len); | 199 buffer->data() + pos, buffer_len); |
200 DCHECK_LE(result, buffer_len); | 200 DCHECK_LE(result, buffer_len); |
201 pos += result; | 201 pos += result; |
202 buffer_len -= pos; | 202 buffer_len -= pos; |
203 if (pending_packets_.front()->is_empty()) { | 203 if (pending_packets_.front()->is_empty()) |
204 delete pending_packets_.front(); | 204 pending_packets_.pop_front(); |
205 pending_packets_.erase(pending_packets_.begin()); | |
206 } | |
207 } | 205 } |
208 return pos; | 206 return pos; |
209 } | 207 } |
210 | 208 |
211 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel) | 209 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel) |
212 : channel_(channel), | 210 : channel_(channel), |
213 read_buffer_size_(0), | 211 read_buffer_size_(0), |
214 write_pending_(false), | 212 write_pending_(false), |
215 write_result_(0) { | 213 write_result_(0) { |
216 } | 214 } |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
303 | 301 |
304 ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory, | 302 ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory, |
305 const std::string& base_channel_name) | 303 const std::string& base_channel_name) |
306 : base_channel_factory_(factory), | 304 : base_channel_factory_(factory), |
307 base_channel_name_(base_channel_name), | 305 base_channel_name_(base_channel_name), |
308 next_channel_id_(0), | 306 next_channel_id_(0), |
309 weak_factory_(this) {} | 307 weak_factory_(this) {} |
310 | 308 |
311 ChannelMultiplexer::~ChannelMultiplexer() { | 309 ChannelMultiplexer::~ChannelMultiplexer() { |
312 DCHECK(pending_channels_.empty()); | 310 DCHECK(pending_channels_.empty()); |
313 base::STLDeleteValues(&channels_); | |
314 | 311 |
315 // Cancel creation of the base channel if it hasn't finished. | 312 // Cancel creation of the base channel if it hasn't finished. |
316 if (base_channel_factory_) | 313 if (base_channel_factory_) |
317 base_channel_factory_->CancelChannelCreation(base_channel_name_); | 314 base_channel_factory_->CancelChannelCreation(base_channel_name_); |
318 } | 315 } |
319 | 316 |
320 void ChannelMultiplexer::CreateChannel(const std::string& name, | 317 void ChannelMultiplexer::CreateChannel(const std::string& name, |
321 const ChannelCreatedCallback& callback) { | 318 const ChannelCreatedCallback& callback) { |
322 if (base_channel_.get()) { | 319 if (base_channel_.get()) { |
323 // Already have |base_channel_|. Create new multiplexed channel | 320 // Already have |base_channel_|. Create new multiplexed channel |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
386 PendingChannel c = pending_channels_.front(); | 383 PendingChannel c = pending_channels_.front(); |
387 pending_channels_.erase(pending_channels_.begin()); | 384 pending_channels_.erase(pending_channels_.begin()); |
388 std::unique_ptr<P2PStreamSocket> socket; | 385 std::unique_ptr<P2PStreamSocket> socket; |
389 if (base_channel_.get()) | 386 if (base_channel_.get()) |
390 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 387 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
391 c.callback.Run(std::move(socket)); | 388 c.callback.Run(std::move(socket)); |
392 } | 389 } |
393 | 390 |
394 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 391 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
395 const std::string& name) { | 392 const std::string& name) { |
396 // Check if we already have a channel with the requested name. | 393 std::unique_ptr<MuxChannel>& channel = channels_[name]; |
397 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 394 if (!channel) { |
398 if (it != channels_.end()) | 395 // Create a new channel if we haven't found existing one. |
399 return it->second; | 396 channel = base::MakeUnique<MuxChannel>(this, name, next_channel_id_); |
| 397 ++next_channel_id_; |
| 398 } |
400 | 399 |
401 // Create a new channel if we haven't found existing one. | 400 return channel.get(); |
402 MuxChannel* channel = new MuxChannel(this, name, next_channel_id_); | |
403 ++next_channel_id_; | |
404 channels_[channel->name()] = channel; | |
405 return channel; | |
406 } | 401 } |
407 | 402 |
408 | 403 |
409 void ChannelMultiplexer::OnBaseChannelError(int error) { | 404 void ChannelMultiplexer::OnBaseChannelError(int error) { |
410 for (std::map<std::string, MuxChannel*>::iterator it = channels_.begin(); | 405 for (auto it = channels_.begin(); it != channels_.end(); ++it) { |
411 it != channels_.end(); ++it) { | |
412 base::ThreadTaskRunnerHandle::Get()->PostTask( | 406 base::ThreadTaskRunnerHandle::Get()->PostTask( |
413 FROM_HERE, | 407 FROM_HERE, |
414 base::Bind(&ChannelMultiplexer::NotifyBaseChannelError, | 408 base::Bind(&ChannelMultiplexer::NotifyBaseChannelError, |
415 weak_factory_.GetWeakPtr(), it->second->name(), error)); | 409 weak_factory_.GetWeakPtr(), it->second->name(), error)); |
416 } | 410 } |
417 } | 411 } |
418 | 412 |
419 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, | 413 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, |
420 int error) { | 414 int error) { |
421 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 415 auto it = channels_.find(name); |
422 if (it != channels_.end()) | 416 if (it != channels_.end()) |
423 it->second->OnBaseChannelError(error); | 417 it->second->OnBaseChannelError(error); |
424 } | 418 } |
425 | 419 |
426 void ChannelMultiplexer::OnIncomingPacket( | 420 void ChannelMultiplexer::OnIncomingPacket( |
427 std::unique_ptr<CompoundBuffer> buffer) { | 421 std::unique_ptr<CompoundBuffer> buffer) { |
428 std::unique_ptr<MultiplexPacket> packet = | 422 std::unique_ptr<MultiplexPacket> packet = |
429 ParseMessage<MultiplexPacket>(buffer.get()); | 423 ParseMessage<MultiplexPacket>(buffer.get()); |
430 if (!packet) | 424 if (!packet) |
431 return; | 425 return; |
(...skipping 25 matching lines...) Expand all Loading... |
457 channel->OnIncomingPacket(std::move(packet)); | 451 channel->OnIncomingPacket(std::move(packet)); |
458 } | 452 } |
459 | 453 |
460 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, | 454 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, |
461 const base::Closure& done_task) { | 455 const base::Closure& done_task) { |
462 writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 456 writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
463 } | 457 } |
464 | 458 |
465 } // namespace protocol | 459 } // namespace protocol |
466 } // namespace remoting | 460 } // namespace remoting |
OLD | NEW |