OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
11 #include "base/callback_helpers.h" | 11 #include "base/callback_helpers.h" |
12 #include "base/location.h" | 12 #include "base/location.h" |
13 #include "base/single_thread_task_runner.h" | 13 #include "base/single_thread_task_runner.h" |
14 #include "base/stl_util.h" | 14 #include "base/stl_util.h" |
15 #include "base/thread_task_runner_handle.h" | 15 #include "base/thread_task_runner_handle.h" |
16 #include "net/base/net_errors.h" | 16 #include "net/base/net_errors.h" |
17 #include "net/socket/stream_socket.h" | |
18 #include "remoting/protocol/message_serialization.h" | 17 #include "remoting/protocol/message_serialization.h" |
| 18 #include "remoting/protocol/p2p_stream_socket.h" |
19 | 19 |
20 namespace remoting { | 20 namespace remoting { |
21 namespace protocol { | 21 namespace protocol { |
22 | 22 |
23 namespace { | 23 namespace { |
24 const int kChannelIdUnknown = -1; | 24 const int kChannelIdUnknown = -1; |
25 const int kMaxPacketSize = 1024; | 25 const int kMaxPacketSize = 1024; |
26 | 26 |
27 class PendingPacket { | 27 class PendingPacket { |
28 public: | 28 public: |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
70 public: | 70 public: |
71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, | 71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, |
72 int send_id); | 72 int send_id); |
73 ~MuxChannel(); | 73 ~MuxChannel(); |
74 | 74 |
75 const std::string& name() { return name_; } | 75 const std::string& name() { return name_; } |
76 int receive_id() { return receive_id_; } | 76 int receive_id() { return receive_id_; } |
77 void set_receive_id(int id) { receive_id_ = id; } | 77 void set_receive_id(int id) { receive_id_ = id; } |
78 | 78 |
79 // Called by ChannelMultiplexer. | 79 // Called by ChannelMultiplexer. |
80 scoped_ptr<net::StreamSocket> CreateSocket(); | 80 scoped_ptr<P2PStreamSocket> CreateSocket(); |
81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, | 81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, |
82 const base::Closure& done_task); | 82 const base::Closure& done_task); |
83 void OnBaseChannelError(int error); | 83 void OnBaseChannelError(int error); |
84 | 84 |
85 // Called by MuxSocket. | 85 // Called by MuxSocket. |
86 void OnSocketDestroyed(); | 86 void OnSocketDestroyed(); |
87 bool DoWrite(scoped_ptr<MultiplexPacket> packet, | 87 bool DoWrite(scoped_ptr<MultiplexPacket> packet, |
88 const base::Closure& done_task); | 88 const base::Closure& done_task); |
89 int DoRead(net::IOBuffer* buffer, int buffer_len); | 89 int DoRead(net::IOBuffer* buffer, int buffer_len); |
90 | 90 |
91 private: | 91 private: |
92 ChannelMultiplexer* multiplexer_; | 92 ChannelMultiplexer* multiplexer_; |
93 std::string name_; | 93 std::string name_; |
94 int send_id_; | 94 int send_id_; |
95 bool id_sent_; | 95 bool id_sent_; |
96 int receive_id_; | 96 int receive_id_; |
97 MuxSocket* socket_; | 97 MuxSocket* socket_; |
98 std::list<PendingPacket*> pending_packets_; | 98 std::list<PendingPacket*> pending_packets_; |
99 | 99 |
100 DISALLOW_COPY_AND_ASSIGN(MuxChannel); | 100 DISALLOW_COPY_AND_ASSIGN(MuxChannel); |
101 }; | 101 }; |
102 | 102 |
103 class ChannelMultiplexer::MuxSocket : public net::StreamSocket, | 103 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket, |
104 public base::NonThreadSafe, | 104 public base::NonThreadSafe, |
105 public base::SupportsWeakPtr<MuxSocket> { | 105 public base::SupportsWeakPtr<MuxSocket> { |
106 public: | 106 public: |
107 MuxSocket(MuxChannel* channel); | 107 MuxSocket(MuxChannel* channel); |
108 ~MuxSocket() override; | 108 ~MuxSocket() override; |
109 | 109 |
110 void OnWriteComplete(); | 110 void OnWriteComplete(); |
111 void OnBaseChannelError(int error); | 111 void OnBaseChannelError(int error); |
112 void OnPacketReceived(); | 112 void OnPacketReceived(); |
113 | 113 |
114 // net::StreamSocket interface. | 114 // P2PStreamSocket interface. |
115 int Read(net::IOBuffer* buffer, | 115 int Read(net::IOBuffer* buffer, |
116 int buffer_len, | 116 int buffer_len, |
117 const net::CompletionCallback& callback) override; | 117 const net::CompletionCallback& callback) override; |
118 int Write(net::IOBuffer* buffer, | 118 int Write(net::IOBuffer* buffer, |
119 int buffer_len, | 119 int buffer_len, |
120 const net::CompletionCallback& callback) override; | 120 const net::CompletionCallback& callback) override; |
121 | 121 |
122 int SetReceiveBufferSize(int32 size) override { | |
123 NOTIMPLEMENTED(); | |
124 return net::ERR_NOT_IMPLEMENTED; | |
125 } | |
126 int SetSendBufferSize(int32 size) override { | |
127 NOTIMPLEMENTED(); | |
128 return net::ERR_NOT_IMPLEMENTED; | |
129 } | |
130 | |
131 int Connect(const net::CompletionCallback& callback) override { | |
132 NOTIMPLEMENTED(); | |
133 return net::ERR_NOT_IMPLEMENTED; | |
134 } | |
135 void Disconnect() override { NOTIMPLEMENTED(); } | |
136 bool IsConnected() const override { | |
137 NOTIMPLEMENTED(); | |
138 return true; | |
139 } | |
140 bool IsConnectedAndIdle() const override { | |
141 NOTIMPLEMENTED(); | |
142 return false; | |
143 } | |
144 int GetPeerAddress(net::IPEndPoint* address) const override { | |
145 NOTIMPLEMENTED(); | |
146 return net::ERR_NOT_IMPLEMENTED; | |
147 } | |
148 int GetLocalAddress(net::IPEndPoint* address) const override { | |
149 NOTIMPLEMENTED(); | |
150 return net::ERR_NOT_IMPLEMENTED; | |
151 } | |
152 const net::BoundNetLog& NetLog() const override { | |
153 NOTIMPLEMENTED(); | |
154 return net_log_; | |
155 } | |
156 void SetSubresourceSpeculation() override { NOTIMPLEMENTED(); } | |
157 void SetOmniboxSpeculation() override { NOTIMPLEMENTED(); } | |
158 bool WasEverUsed() const override { return true; } | |
159 bool UsingTCPFastOpen() const override { return false; } | |
160 bool WasNpnNegotiated() const override { return false; } | |
161 net::NextProto GetNegotiatedProtocol() const override { | |
162 return net::kProtoUnknown; | |
163 } | |
164 bool GetSSLInfo(net::SSLInfo* ssl_info) override { | |
165 NOTIMPLEMENTED(); | |
166 return false; | |
167 } | |
168 void GetConnectionAttempts(net::ConnectionAttempts* out) const override { | |
169 out->clear(); | |
170 } | |
171 void ClearConnectionAttempts() override {} | |
172 void AddConnectionAttempts(const net::ConnectionAttempts& attempts) override { | |
173 } | |
174 | |
175 private: | 122 private: |
176 MuxChannel* channel_; | 123 MuxChannel* channel_; |
177 | 124 |
178 int base_channel_error_ = net::OK; | 125 int base_channel_error_ = net::OK; |
179 | 126 |
180 net::CompletionCallback read_callback_; | 127 net::CompletionCallback read_callback_; |
181 scoped_refptr<net::IOBuffer> read_buffer_; | 128 scoped_refptr<net::IOBuffer> read_buffer_; |
182 int read_buffer_size_; | 129 int read_buffer_size_; |
183 | 130 |
184 bool write_pending_; | 131 bool write_pending_; |
185 int write_result_; | 132 int write_result_; |
186 net::CompletionCallback write_callback_; | 133 net::CompletionCallback write_callback_; |
187 | 134 |
188 net::BoundNetLog net_log_; | |
189 | |
190 DISALLOW_COPY_AND_ASSIGN(MuxSocket); | 135 DISALLOW_COPY_AND_ASSIGN(MuxSocket); |
191 }; | 136 }; |
192 | 137 |
193 | 138 |
194 ChannelMultiplexer::MuxChannel::MuxChannel( | 139 ChannelMultiplexer::MuxChannel::MuxChannel( |
195 ChannelMultiplexer* multiplexer, | 140 ChannelMultiplexer* multiplexer, |
196 const std::string& name, | 141 const std::string& name, |
197 int send_id) | 142 int send_id) |
198 : multiplexer_(multiplexer), | 143 : multiplexer_(multiplexer), |
199 name_(name), | 144 name_(name), |
200 send_id_(send_id), | 145 send_id_(send_id), |
201 id_sent_(false), | 146 id_sent_(false), |
202 receive_id_(kChannelIdUnknown), | 147 receive_id_(kChannelIdUnknown), |
203 socket_(nullptr) { | 148 socket_(nullptr) { |
204 } | 149 } |
205 | 150 |
206 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 151 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
207 // Socket must be destroyed before the channel. | 152 // Socket must be destroyed before the channel. |
208 DCHECK(!socket_); | 153 DCHECK(!socket_); |
209 STLDeleteElements(&pending_packets_); | 154 STLDeleteElements(&pending_packets_); |
210 } | 155 } |
211 | 156 |
212 scoped_ptr<net::StreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { | 157 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { |
213 DCHECK(!socket_); // Can't create more than one socket per channel. | 158 DCHECK(!socket_); // Can't create more than one socket per channel. |
214 scoped_ptr<MuxSocket> result(new MuxSocket(this)); | 159 scoped_ptr<MuxSocket> result(new MuxSocket(this)); |
215 socket_ = result.get(); | 160 socket_ = result.get(); |
216 return result.Pass(); | 161 return result.Pass(); |
217 } | 162 } |
218 | 163 |
219 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 164 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
220 scoped_ptr<MultiplexPacket> packet, | 165 scoped_ptr<MultiplexPacket> packet, |
221 const base::Closure& done_task) { | 166 const base::Closure& done_task) { |
222 DCHECK_EQ(packet->channel_id(), receive_id_); | 167 DCHECK_EQ(packet->channel_id(), receive_id_); |
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
413 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); | 358 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); |
414 it != pending_channels_.end(); ++it) { | 359 it != pending_channels_.end(); ++it) { |
415 if (it->name == name) { | 360 if (it->name == name) { |
416 pending_channels_.erase(it); | 361 pending_channels_.erase(it); |
417 return; | 362 return; |
418 } | 363 } |
419 } | 364 } |
420 } | 365 } |
421 | 366 |
422 void ChannelMultiplexer::OnBaseChannelReady( | 367 void ChannelMultiplexer::OnBaseChannelReady( |
423 scoped_ptr<net::StreamSocket> socket) { | 368 scoped_ptr<P2PStreamSocket> socket) { |
424 base_channel_factory_ = nullptr; | 369 base_channel_factory_ = nullptr; |
425 base_channel_ = socket.Pass(); | 370 base_channel_ = socket.Pass(); |
426 | 371 |
427 if (base_channel_.get()) { | 372 if (base_channel_.get()) { |
428 // Initialize reader and writer. | 373 // Initialize reader and writer. |
429 reader_.StartReading(base_channel_.get(), | 374 reader_.StartReading(base_channel_.get(), |
430 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 375 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
431 base::Unretained(this))); | 376 base::Unretained(this))); |
432 writer_.Init(base_channel_.get(), | 377 writer_.Init(base::Bind(&P2PStreamSocket::Write, |
| 378 base::Unretained(base_channel_.get())), |
433 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 379 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
434 base::Unretained(this))); | 380 base::Unretained(this))); |
435 } | 381 } |
436 | 382 |
437 DoCreatePendingChannels(); | 383 DoCreatePendingChannels(); |
438 } | 384 } |
439 | 385 |
440 void ChannelMultiplexer::DoCreatePendingChannels() { | 386 void ChannelMultiplexer::DoCreatePendingChannels() { |
441 if (pending_channels_.empty()) | 387 if (pending_channels_.empty()) |
442 return; | 388 return; |
443 | 389 |
444 // Every time this function is called it connects a single channel and posts a | 390 // Every time this function is called it connects a single channel and posts a |
445 // separate task to connect other channels. This is necessary because the | 391 // separate task to connect other channels. This is necessary because the |
446 // callback may destroy the multiplexer or somehow else modify | 392 // callback may destroy the multiplexer or somehow else modify |
447 // |pending_channels_| list (e.g. call CancelChannelCreation()). | 393 // |pending_channels_| list (e.g. call CancelChannelCreation()). |
448 base::ThreadTaskRunnerHandle::Get()->PostTask( | 394 base::ThreadTaskRunnerHandle::Get()->PostTask( |
449 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, | 395 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, |
450 weak_factory_.GetWeakPtr())); | 396 weak_factory_.GetWeakPtr())); |
451 | 397 |
452 PendingChannel c = pending_channels_.front(); | 398 PendingChannel c = pending_channels_.front(); |
453 pending_channels_.erase(pending_channels_.begin()); | 399 pending_channels_.erase(pending_channels_.begin()); |
454 scoped_ptr<net::StreamSocket> socket; | 400 scoped_ptr<P2PStreamSocket> socket; |
455 if (base_channel_.get()) | 401 if (base_channel_.get()) |
456 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 402 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
457 c.callback.Run(socket.Pass()); | 403 c.callback.Run(socket.Pass()); |
458 } | 404 } |
459 | 405 |
460 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 406 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
461 const std::string& name) { | 407 const std::string& name) { |
462 // Check if we already have a channel with the requested name. | 408 // Check if we already have a channel with the requested name. |
463 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 409 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
464 if (it != channels_.end()) | 410 if (it != channels_.end()) |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
520 channel->OnIncomingPacket(packet.Pass(), done_task); | 466 channel->OnIncomingPacket(packet.Pass(), done_task); |
521 } | 467 } |
522 | 468 |
523 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, | 469 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, |
524 const base::Closure& done_task) { | 470 const base::Closure& done_task) { |
525 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 471 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
526 } | 472 } |
527 | 473 |
528 } // namespace protocol | 474 } // namespace protocol |
529 } // namespace remoting | 475 } // namespace remoting |
OLD | NEW |