OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <string.h> | 8 #include <string.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/callback.h" | 13 #include "base/callback.h" |
14 #include "base/callback_helpers.h" | 14 #include "base/callback_helpers.h" |
15 #include "base/location.h" | 15 #include "base/location.h" |
16 #include "base/macros.h" | 16 #include "base/macros.h" |
17 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
| 18 #include "base/sequence_checker.h" |
18 #include "base/single_thread_task_runner.h" | 19 #include "base/single_thread_task_runner.h" |
19 #include "base/threading/thread_task_runner_handle.h" | 20 #include "base/threading/thread_task_runner_handle.h" |
20 #include "net/base/net_errors.h" | 21 #include "net/base/net_errors.h" |
21 #include "remoting/protocol/message_serialization.h" | 22 #include "remoting/protocol/message_serialization.h" |
22 #include "remoting/protocol/p2p_stream_socket.h" | 23 #include "remoting/protocol/p2p_stream_socket.h" |
23 | 24 |
24 namespace remoting { | 25 namespace remoting { |
25 namespace protocol { | 26 namespace protocol { |
26 | 27 |
27 namespace { | 28 namespace { |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 int send_id_; | 91 int send_id_; |
91 bool id_sent_; | 92 bool id_sent_; |
92 int receive_id_; | 93 int receive_id_; |
93 MuxSocket* socket_; | 94 MuxSocket* socket_; |
94 std::list<std::unique_ptr<PendingPacket>> pending_packets_; | 95 std::list<std::unique_ptr<PendingPacket>> pending_packets_; |
95 | 96 |
96 DISALLOW_COPY_AND_ASSIGN(MuxChannel); | 97 DISALLOW_COPY_AND_ASSIGN(MuxChannel); |
97 }; | 98 }; |
98 | 99 |
99 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, | 100 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, |
100 public base::NonThreadSafe, | |
101 public base::SupportsWeakPtr<MuxSocket> { | 101 public base::SupportsWeakPtr<MuxSocket> { |
102 public: | 102 public: |
103 MuxSocket(MuxChannel* channel); | 103 MuxSocket(MuxChannel* channel); |
104 ~MuxSocket() override; | 104 ~MuxSocket() override; |
105 | 105 |
106 void OnWriteComplete(); | 106 void OnWriteComplete(); |
107 void OnBaseChannelError(int error); | 107 void OnBaseChannelError(int error); |
108 void OnPacketReceived(); | 108 void OnPacketReceived(); |
109 | 109 |
110 // P2PStreamSocket interface. | 110 // P2PStreamSocket interface. |
111 int Read(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, | 111 int Read(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
112 const net::CompletionCallback& callback) override; | 112 const net::CompletionCallback& callback) override; |
113 int Write(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, | 113 int Write(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
114 const net::CompletionCallback& callback) override; | 114 const net::CompletionCallback& callback) override; |
115 | 115 |
116 private: | 116 private: |
117 MuxChannel* channel_; | 117 MuxChannel* channel_; |
118 | 118 |
119 int base_channel_error_ = net::OK; | 119 int base_channel_error_ = net::OK; |
120 | 120 |
121 net::CompletionCallback read_callback_; | 121 net::CompletionCallback read_callback_; |
122 scoped_refptr<net::IOBuffer> read_buffer_; | 122 scoped_refptr<net::IOBuffer> read_buffer_; |
123 int read_buffer_size_; | 123 int read_buffer_size_; |
124 | 124 |
125 bool write_pending_; | 125 bool write_pending_; |
126 int write_result_; | 126 int write_result_; |
127 net::CompletionCallback write_callback_; | 127 net::CompletionCallback write_callback_; |
128 | 128 |
| 129 SEQUENCE_CHECKER(sequence_checker_); |
| 130 |
129 DISALLOW_COPY_AND_ASSIGN(MuxSocket); | 131 DISALLOW_COPY_AND_ASSIGN(MuxSocket); |
130 }; | 132 }; |
131 | 133 |
132 | 134 |
133 ChannelMultiplexer::MuxChannel::MuxChannel( | 135 ChannelMultiplexer::MuxChannel::MuxChannel( |
134 ChannelMultiplexer* multiplexer, | 136 ChannelMultiplexer* multiplexer, |
135 const std::string& name, | 137 const std::string& name, |
136 int send_id) | 138 int send_id) |
137 : multiplexer_(multiplexer), | 139 : multiplexer_(multiplexer), |
138 name_(name), | 140 name_(name), |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
213 write_result_(0) { | 215 write_result_(0) { |
214 } | 216 } |
215 | 217 |
216 ChannelMultiplexer::MuxSocket::~MuxSocket() { | 218 ChannelMultiplexer::MuxSocket::~MuxSocket() { |
217 channel_->OnSocketDestroyed(); | 219 channel_->OnSocketDestroyed(); |
218 } | 220 } |
219 | 221 |
220 int ChannelMultiplexer::MuxSocket::Read( | 222 int ChannelMultiplexer::MuxSocket::Read( |
221 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, | 223 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
222 const net::CompletionCallback& callback) { | 224 const net::CompletionCallback& callback) { |
223 DCHECK(CalledOnValidThread()); | 225 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
224 DCHECK(read_callback_.is_null()); | 226 DCHECK(read_callback_.is_null()); |
225 | 227 |
226 if (base_channel_error_ != net::OK) | 228 if (base_channel_error_ != net::OK) |
227 return base_channel_error_; | 229 return base_channel_error_; |
228 | 230 |
229 int result = channel_->DoRead(buffer, buffer_len); | 231 int result = channel_->DoRead(buffer, buffer_len); |
230 if (result == 0) { | 232 if (result == 0) { |
231 read_buffer_ = buffer; | 233 read_buffer_ = buffer; |
232 read_buffer_size_ = buffer_len; | 234 read_buffer_size_ = buffer_len; |
233 read_callback_ = callback; | 235 read_callback_ = callback; |
234 return net::ERR_IO_PENDING; | 236 return net::ERR_IO_PENDING; |
235 } | 237 } |
236 return result; | 238 return result; |
237 } | 239 } |
238 | 240 |
239 int ChannelMultiplexer::MuxSocket::Write( | 241 int ChannelMultiplexer::MuxSocket::Write( |
240 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, | 242 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
241 const net::CompletionCallback& callback) { | 243 const net::CompletionCallback& callback) { |
242 DCHECK(CalledOnValidThread()); | 244 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
243 DCHECK(write_callback_.is_null()); | 245 DCHECK(write_callback_.is_null()); |
244 | 246 |
245 if (base_channel_error_ != net::OK) | 247 if (base_channel_error_ != net::OK) |
246 return base_channel_error_; | 248 return base_channel_error_; |
247 | 249 |
248 std::unique_ptr<MultiplexPacket> packet(new MultiplexPacket()); | 250 std::unique_ptr<MultiplexPacket> packet(new MultiplexPacket()); |
249 size_t size = std::min(kMaxPacketSize, buffer_len); | 251 size_t size = std::min(kMaxPacketSize, buffer_len); |
250 packet->mutable_data()->assign(buffer->data(), size); | 252 packet->mutable_data()->assign(buffer->data(), size); |
251 | 253 |
252 write_pending_ = true; | 254 write_pending_ = true; |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
300 } | 302 } |
301 | 303 |
302 ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory, | 304 ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory, |
303 const std::string& base_channel_name) | 305 const std::string& base_channel_name) |
304 : base_channel_factory_(factory), | 306 : base_channel_factory_(factory), |
305 base_channel_name_(base_channel_name), | 307 base_channel_name_(base_channel_name), |
306 next_channel_id_(0), | 308 next_channel_id_(0), |
307 weak_factory_(this) {} | 309 weak_factory_(this) {} |
308 | 310 |
309 ChannelMultiplexer::~ChannelMultiplexer() { | 311 ChannelMultiplexer::~ChannelMultiplexer() { |
| 312 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
310 DCHECK(pending_channels_.empty()); | 313 DCHECK(pending_channels_.empty()); |
311 | 314 |
312 // Cancel creation of the base channel if it hasn't finished. | 315 // Cancel creation of the base channel if it hasn't finished. |
313 if (base_channel_factory_) | 316 if (base_channel_factory_) |
314 base_channel_factory_->CancelChannelCreation(base_channel_name_); | 317 base_channel_factory_->CancelChannelCreation(base_channel_name_); |
315 } | 318 } |
316 | 319 |
317 void ChannelMultiplexer::CreateChannel(const std::string& name, | 320 void ChannelMultiplexer::CreateChannel(const std::string& name, |
318 const ChannelCreatedCallback& callback) { | 321 const ChannelCreatedCallback& callback) { |
319 if (base_channel_.get()) { | 322 if (base_channel_.get()) { |
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
451 channel->OnIncomingPacket(std::move(packet)); | 454 channel->OnIncomingPacket(std::move(packet)); |
452 } | 455 } |
453 | 456 |
454 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, | 457 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, |
455 const base::Closure& done_task) { | 458 const base::Closure& done_task) { |
456 writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 459 writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
457 } | 460 } |
458 | 461 |
459 } // namespace protocol | 462 } // namespace protocol |
460 } // namespace remoting | 463 } // namespace remoting |
OLD | NEW |