Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(334)

Side by Side Diff: remoting/protocol/channel_multiplexer.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/channel_multiplexer.h" 5 #include "remoting/protocol/channel_multiplexer.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/callback_helpers.h" 11 #include "base/callback_helpers.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/single_thread_task_runner.h" 13 #include "base/single_thread_task_runner.h"
14 #include "base/stl_util.h" 14 #include "base/stl_util.h"
15 #include "base/thread_task_runner_handle.h" 15 #include "base/thread_task_runner_handle.h"
16 #include "net/base/net_errors.h" 16 #include "net/base/net_errors.h"
17 #include "net/socket/stream_socket.h"
18 #include "remoting/protocol/message_serialization.h" 17 #include "remoting/protocol/message_serialization.h"
18 #include "remoting/protocol/p2p_stream_socket.h"
19 19
20 namespace remoting { 20 namespace remoting {
21 namespace protocol { 21 namespace protocol {
22 22
23 namespace { 23 namespace {
24 const int kChannelIdUnknown = -1; 24 const int kChannelIdUnknown = -1;
25 const int kMaxPacketSize = 1024; 25 const int kMaxPacketSize = 1024;
26 26
27 class PendingPacket { 27 class PendingPacket {
28 public: 28 public:
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
70 public: 70 public:
71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, 71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name,
72 int send_id); 72 int send_id);
73 ~MuxChannel(); 73 ~MuxChannel();
74 74
75 const std::string& name() { return name_; } 75 const std::string& name() { return name_; }
76 int receive_id() { return receive_id_; } 76 int receive_id() { return receive_id_; }
77 void set_receive_id(int id) { receive_id_ = id; } 77 void set_receive_id(int id) { receive_id_ = id; }
78 78
79 // Called by ChannelMultiplexer. 79 // Called by ChannelMultiplexer.
80 scoped_ptr<net::StreamSocket> CreateSocket(); 80 scoped_ptr<P2PStreamSocket> CreateSocket();
81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, 81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
82 const base::Closure& done_task); 82 const base::Closure& done_task);
83 void OnBaseChannelError(int error); 83 void OnBaseChannelError(int error);
84 84
85 // Called by MuxSocket. 85 // Called by MuxSocket.
86 void OnSocketDestroyed(); 86 void OnSocketDestroyed();
87 bool DoWrite(scoped_ptr<MultiplexPacket> packet, 87 bool DoWrite(scoped_ptr<MultiplexPacket> packet,
88 const base::Closure& done_task); 88 const base::Closure& done_task);
89 int DoRead(net::IOBuffer* buffer, int buffer_len); 89 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len);
90 90
91 private: 91 private:
92 ChannelMultiplexer* multiplexer_; 92 ChannelMultiplexer* multiplexer_;
93 std::string name_; 93 std::string name_;
94 int send_id_; 94 int send_id_;
95 bool id_sent_; 95 bool id_sent_;
96 int receive_id_; 96 int receive_id_;
97 MuxSocket* socket_; 97 MuxSocket* socket_;
98 std::list<PendingPacket*> pending_packets_; 98 std::list<PendingPacket*> pending_packets_;
99 99
100 DISALLOW_COPY_AND_ASSIGN(MuxChannel); 100 DISALLOW_COPY_AND_ASSIGN(MuxChannel);
101 }; 101 };
102 102
103 class ChannelMultiplexer::MuxSocket : public net::StreamSocket, 103 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket,
104 public base::NonThreadSafe, 104 public base::NonThreadSafe,
105 public base::SupportsWeakPtr<MuxSocket> { 105 public base::SupportsWeakPtr<MuxSocket> {
106 public: 106 public:
107 MuxSocket(MuxChannel* channel); 107 MuxSocket(MuxChannel* channel);
108 ~MuxSocket() override; 108 ~MuxSocket() override;
109 109
110 void OnWriteComplete(); 110 void OnWriteComplete();
111 void OnBaseChannelError(int error); 111 void OnBaseChannelError(int error);
112 void OnPacketReceived(); 112 void OnPacketReceived();
113 113
114 // net::StreamSocket interface. 114 // P2PStreamSocket interface.
115 int Read(net::IOBuffer* buffer, 115 int Read(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len,
116 int buffer_len,
117 const net::CompletionCallback& callback) override; 116 const net::CompletionCallback& callback) override;
118 int Write(net::IOBuffer* buffer, 117 int Write(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len,
119 int buffer_len,
120 const net::CompletionCallback& callback) override; 118 const net::CompletionCallback& callback) override;
121 119
122 int SetReceiveBufferSize(int32 size) override {
123 NOTIMPLEMENTED();
124 return net::ERR_NOT_IMPLEMENTED;
125 }
126 int SetSendBufferSize(int32 size) override {
127 NOTIMPLEMENTED();
128 return net::ERR_NOT_IMPLEMENTED;
129 }
130
131 int Connect(const net::CompletionCallback& callback) override {
132 NOTIMPLEMENTED();
133 return net::ERR_NOT_IMPLEMENTED;
134 }
135 void Disconnect() override { NOTIMPLEMENTED(); }
136 bool IsConnected() const override {
137 NOTIMPLEMENTED();
138 return true;
139 }
140 bool IsConnectedAndIdle() const override {
141 NOTIMPLEMENTED();
142 return false;
143 }
144 int GetPeerAddress(net::IPEndPoint* address) const override {
145 NOTIMPLEMENTED();
146 return net::ERR_NOT_IMPLEMENTED;
147 }
148 int GetLocalAddress(net::IPEndPoint* address) const override {
149 NOTIMPLEMENTED();
150 return net::ERR_NOT_IMPLEMENTED;
151 }
152 const net::BoundNetLog& NetLog() const override {
153 NOTIMPLEMENTED();
154 return net_log_;
155 }
156 void SetSubresourceSpeculation() override { NOTIMPLEMENTED(); }
157 void SetOmniboxSpeculation() override { NOTIMPLEMENTED(); }
158 bool WasEverUsed() const override { return true; }
159 bool UsingTCPFastOpen() const override { return false; }
160 bool WasNpnNegotiated() const override { return false; }
161 net::NextProto GetNegotiatedProtocol() const override {
162 return net::kProtoUnknown;
163 }
164 bool GetSSLInfo(net::SSLInfo* ssl_info) override {
165 NOTIMPLEMENTED();
166 return false;
167 }
168 void GetConnectionAttempts(net::ConnectionAttempts* out) const override {
169 out->clear();
170 }
171 void ClearConnectionAttempts() override {}
172 void AddConnectionAttempts(const net::ConnectionAttempts& attempts) override {
173 }
174
175 private: 120 private:
176 MuxChannel* channel_; 121 MuxChannel* channel_;
177 122
178 int base_channel_error_ = net::OK; 123 int base_channel_error_ = net::OK;
179 124
180 net::CompletionCallback read_callback_; 125 net::CompletionCallback read_callback_;
181 scoped_refptr<net::IOBuffer> read_buffer_; 126 scoped_refptr<net::IOBuffer> read_buffer_;
182 int read_buffer_size_; 127 int read_buffer_size_;
183 128
184 bool write_pending_; 129 bool write_pending_;
185 int write_result_; 130 int write_result_;
186 net::CompletionCallback write_callback_; 131 net::CompletionCallback write_callback_;
187 132
188 net::BoundNetLog net_log_;
189
190 DISALLOW_COPY_AND_ASSIGN(MuxSocket); 133 DISALLOW_COPY_AND_ASSIGN(MuxSocket);
191 }; 134 };
192 135
193 136
194 ChannelMultiplexer::MuxChannel::MuxChannel( 137 ChannelMultiplexer::MuxChannel::MuxChannel(
195 ChannelMultiplexer* multiplexer, 138 ChannelMultiplexer* multiplexer,
196 const std::string& name, 139 const std::string& name,
197 int send_id) 140 int send_id)
198 : multiplexer_(multiplexer), 141 : multiplexer_(multiplexer),
199 name_(name), 142 name_(name),
200 send_id_(send_id), 143 send_id_(send_id),
201 id_sent_(false), 144 id_sent_(false),
202 receive_id_(kChannelIdUnknown), 145 receive_id_(kChannelIdUnknown),
203 socket_(nullptr) { 146 socket_(nullptr) {
204 } 147 }
205 148
206 ChannelMultiplexer::MuxChannel::~MuxChannel() { 149 ChannelMultiplexer::MuxChannel::~MuxChannel() {
207 // Socket must be destroyed before the channel. 150 // Socket must be destroyed before the channel.
208 DCHECK(!socket_); 151 DCHECK(!socket_);
209 STLDeleteElements(&pending_packets_); 152 STLDeleteElements(&pending_packets_);
210 } 153 }
211 154
212 scoped_ptr<net::StreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { 155 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() {
213 DCHECK(!socket_); // Can't create more than one socket per channel. 156 DCHECK(!socket_); // Can't create more than one socket per channel.
214 scoped_ptr<MuxSocket> result(new MuxSocket(this)); 157 scoped_ptr<MuxSocket> result(new MuxSocket(this));
215 socket_ = result.get(); 158 socket_ = result.get();
216 return result.Pass(); 159 return result.Pass();
217 } 160 }
218 161
219 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( 162 void ChannelMultiplexer::MuxChannel::OnIncomingPacket(
220 scoped_ptr<MultiplexPacket> packet, 163 scoped_ptr<MultiplexPacket> packet,
221 const base::Closure& done_task) { 164 const base::Closure& done_task) {
222 DCHECK_EQ(packet->channel_id(), receive_id_); 165 DCHECK_EQ(packet->channel_id(), receive_id_);
(...skipping 20 matching lines...) Expand all
243 scoped_ptr<MultiplexPacket> packet, 186 scoped_ptr<MultiplexPacket> packet,
244 const base::Closure& done_task) { 187 const base::Closure& done_task) {
245 packet->set_channel_id(send_id_); 188 packet->set_channel_id(send_id_);
246 if (!id_sent_) { 189 if (!id_sent_) {
247 packet->set_channel_name(name_); 190 packet->set_channel_name(name_);
248 id_sent_ = true; 191 id_sent_ = true;
249 } 192 }
250 return multiplexer_->DoWrite(packet.Pass(), done_task); 193 return multiplexer_->DoWrite(packet.Pass(), done_task);
251 } 194 }
252 195
253 int ChannelMultiplexer::MuxChannel::DoRead(net::IOBuffer* buffer, 196 int ChannelMultiplexer::MuxChannel::DoRead(
254 int buffer_len) { 197 const scoped_refptr<net::IOBuffer>& buffer,
198 int buffer_len) {
255 int pos = 0; 199 int pos = 0;
256 while (buffer_len > 0 && !pending_packets_.empty()) { 200 while (buffer_len > 0 && !pending_packets_.empty()) {
257 DCHECK(!pending_packets_.front()->is_empty()); 201 DCHECK(!pending_packets_.front()->is_empty());
258 int result = pending_packets_.front()->Read( 202 int result = pending_packets_.front()->Read(
259 buffer->data() + pos, buffer_len); 203 buffer->data() + pos, buffer_len);
260 DCHECK_LE(result, buffer_len); 204 DCHECK_LE(result, buffer_len);
261 pos += result; 205 pos += result;
262 buffer_len -= pos; 206 buffer_len -= pos;
263 if (pending_packets_.front()->is_empty()) { 207 if (pending_packets_.front()->is_empty()) {
264 delete pending_packets_.front(); 208 delete pending_packets_.front();
265 pending_packets_.erase(pending_packets_.begin()); 209 pending_packets_.erase(pending_packets_.begin());
266 } 210 }
267 } 211 }
268 return pos; 212 return pos;
269 } 213 }
270 214
271 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel) 215 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel)
272 : channel_(channel), 216 : channel_(channel),
273 read_buffer_size_(0), 217 read_buffer_size_(0),
274 write_pending_(false), 218 write_pending_(false),
275 write_result_(0) { 219 write_result_(0) {
276 } 220 }
277 221
278 ChannelMultiplexer::MuxSocket::~MuxSocket() { 222 ChannelMultiplexer::MuxSocket::~MuxSocket() {
279 channel_->OnSocketDestroyed(); 223 channel_->OnSocketDestroyed();
280 } 224 }
281 225
282 int ChannelMultiplexer::MuxSocket::Read( 226 int ChannelMultiplexer::MuxSocket::Read(
283 net::IOBuffer* buffer, int buffer_len, 227 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len,
284 const net::CompletionCallback& callback) { 228 const net::CompletionCallback& callback) {
285 DCHECK(CalledOnValidThread()); 229 DCHECK(CalledOnValidThread());
286 DCHECK(read_callback_.is_null()); 230 DCHECK(read_callback_.is_null());
287 231
288 if (base_channel_error_ != net::OK) 232 if (base_channel_error_ != net::OK)
289 return base_channel_error_; 233 return base_channel_error_;
290 234
291 int result = channel_->DoRead(buffer, buffer_len); 235 int result = channel_->DoRead(buffer, buffer_len);
292 if (result == 0) { 236 if (result == 0) {
293 read_buffer_ = buffer; 237 read_buffer_ = buffer;
294 read_buffer_size_ = buffer_len; 238 read_buffer_size_ = buffer_len;
295 read_callback_ = callback; 239 read_callback_ = callback;
296 return net::ERR_IO_PENDING; 240 return net::ERR_IO_PENDING;
297 } 241 }
298 return result; 242 return result;
299 } 243 }
300 244
301 int ChannelMultiplexer::MuxSocket::Write( 245 int ChannelMultiplexer::MuxSocket::Write(
302 net::IOBuffer* buffer, int buffer_len, 246 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len,
303 const net::CompletionCallback& callback) { 247 const net::CompletionCallback& callback) {
304 DCHECK(CalledOnValidThread()); 248 DCHECK(CalledOnValidThread());
305 DCHECK(write_callback_.is_null()); 249 DCHECK(write_callback_.is_null());
306 250
307 if (base_channel_error_ != net::OK) 251 if (base_channel_error_ != net::OK)
308 return base_channel_error_; 252 return base_channel_error_;
309 253
310 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); 254 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket());
311 size_t size = std::min(kMaxPacketSize, buffer_len); 255 size_t size = std::min(kMaxPacketSize, buffer_len);
312 packet->mutable_data()->assign(buffer->data(), size); 256 packet->mutable_data()->assign(buffer->data(), size);
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
413 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); 357 for (std::list<PendingChannel>::iterator it = pending_channels_.begin();
414 it != pending_channels_.end(); ++it) { 358 it != pending_channels_.end(); ++it) {
415 if (it->name == name) { 359 if (it->name == name) {
416 pending_channels_.erase(it); 360 pending_channels_.erase(it);
417 return; 361 return;
418 } 362 }
419 } 363 }
420 } 364 }
421 365
422 void ChannelMultiplexer::OnBaseChannelReady( 366 void ChannelMultiplexer::OnBaseChannelReady(
423 scoped_ptr<net::StreamSocket> socket) { 367 scoped_ptr<P2PStreamSocket> socket) {
424 base_channel_factory_ = nullptr; 368 base_channel_factory_ = nullptr;
425 base_channel_ = socket.Pass(); 369 base_channel_ = socket.Pass();
426 370
427 if (base_channel_.get()) { 371 if (base_channel_.get()) {
428 // Initialize reader and writer. 372 // Initialize reader and writer.
429 reader_.StartReading(base_channel_.get(), 373 reader_.StartReading(base_channel_.get(),
430 base::Bind(&ChannelMultiplexer::OnBaseChannelError, 374 base::Bind(&ChannelMultiplexer::OnBaseChannelError,
431 base::Unretained(this))); 375 base::Unretained(this)));
432 writer_.Init(base_channel_.get(), 376 writer_.Init(base::Bind(&P2PStreamSocket::Write,
377 base::Unretained(base_channel_.get())),
433 base::Bind(&ChannelMultiplexer::OnBaseChannelError, 378 base::Bind(&ChannelMultiplexer::OnBaseChannelError,
434 base::Unretained(this))); 379 base::Unretained(this)));
435 } 380 }
436 381
437 DoCreatePendingChannels(); 382 DoCreatePendingChannels();
438 } 383 }
439 384
440 void ChannelMultiplexer::DoCreatePendingChannels() { 385 void ChannelMultiplexer::DoCreatePendingChannels() {
441 if (pending_channels_.empty()) 386 if (pending_channels_.empty())
442 return; 387 return;
443 388
444 // Every time this function is called it connects a single channel and posts a 389 // Every time this function is called it connects a single channel and posts a
445 // separate task to connect other channels. This is necessary because the 390 // separate task to connect other channels. This is necessary because the
446 // callback may destroy the multiplexer or somehow else modify 391 // callback may destroy the multiplexer or somehow else modify
447 // |pending_channels_| list (e.g. call CancelChannelCreation()). 392 // |pending_channels_| list (e.g. call CancelChannelCreation()).
448 base::ThreadTaskRunnerHandle::Get()->PostTask( 393 base::ThreadTaskRunnerHandle::Get()->PostTask(
449 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, 394 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels,
450 weak_factory_.GetWeakPtr())); 395 weak_factory_.GetWeakPtr()));
451 396
452 PendingChannel c = pending_channels_.front(); 397 PendingChannel c = pending_channels_.front();
453 pending_channels_.erase(pending_channels_.begin()); 398 pending_channels_.erase(pending_channels_.begin());
454 scoped_ptr<net::StreamSocket> socket; 399 scoped_ptr<P2PStreamSocket> socket;
455 if (base_channel_.get()) 400 if (base_channel_.get())
456 socket = GetOrCreateChannel(c.name)->CreateSocket(); 401 socket = GetOrCreateChannel(c.name)->CreateSocket();
457 c.callback.Run(socket.Pass()); 402 c.callback.Run(socket.Pass());
458 } 403 }
459 404
460 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( 405 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel(
461 const std::string& name) { 406 const std::string& name) {
462 // Check if we already have a channel with the requested name. 407 // Check if we already have a channel with the requested name.
463 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); 408 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name);
464 if (it != channels_.end()) 409 if (it != channels_.end())
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
520 channel->OnIncomingPacket(packet.Pass(), done_task); 465 channel->OnIncomingPacket(packet.Pass(), done_task);
521 } 466 }
522 467
523 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, 468 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet,
524 const base::Closure& done_task) { 469 const base::Closure& done_task) {
525 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); 470 return writer_.Write(SerializeAndFrameMessage(*packet), done_task);
526 } 471 }
527 472
528 } // namespace protocol 473 } // namespace protocol
529 } // namespace remoting 474 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/channel_multiplexer.h ('k') | remoting/protocol/channel_multiplexer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698