Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(494)

Side by Side Diff: remoting/protocol/channel_multiplexer.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/channel_multiplexer.h" 5 #include "remoting/protocol/channel_multiplexer.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/callback_helpers.h" 11 #include "base/callback_helpers.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/single_thread_task_runner.h" 13 #include "base/single_thread_task_runner.h"
14 #include "base/stl_util.h" 14 #include "base/stl_util.h"
15 #include "base/thread_task_runner_handle.h" 15 #include "base/thread_task_runner_handle.h"
16 #include "net/base/net_errors.h" 16 #include "net/base/net_errors.h"
17 #include "net/socket/stream_socket.h"
18 #include "remoting/protocol/message_serialization.h" 17 #include "remoting/protocol/message_serialization.h"
18 #include "remoting/protocol/p2p_socket.h"
19 19
20 namespace remoting { 20 namespace remoting {
21 namespace protocol { 21 namespace protocol {
22 22
23 namespace { 23 namespace {
24 const int kChannelIdUnknown = -1; 24 const int kChannelIdUnknown = -1;
25 const int kMaxPacketSize = 1024; 25 const int kMaxPacketSize = 1024;
26 26
27 class PendingPacket { 27 class PendingPacket {
28 public: 28 public:
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
70 public: 70 public:
71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, 71 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name,
72 int send_id); 72 int send_id);
73 ~MuxChannel(); 73 ~MuxChannel();
74 74
75 const std::string& name() { return name_; } 75 const std::string& name() { return name_; }
76 int receive_id() { return receive_id_; } 76 int receive_id() { return receive_id_; }
77 void set_receive_id(int id) { receive_id_ = id; } 77 void set_receive_id(int id) { receive_id_ = id; }
78 78
79 // Called by ChannelMultiplexer. 79 // Called by ChannelMultiplexer.
80 scoped_ptr<net::StreamSocket> CreateSocket(); 80 scoped_ptr<P2PStreamSocket> CreateSocket();
81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet, 81 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
82 const base::Closure& done_task); 82 const base::Closure& done_task);
83 void OnBaseChannelError(int error); 83 void OnBaseChannelError(int error);
84 84
85 // Called by MuxSocket. 85 // Called by MuxSocket.
86 void OnSocketDestroyed(); 86 void OnSocketDestroyed();
87 bool DoWrite(scoped_ptr<MultiplexPacket> packet, 87 bool DoWrite(scoped_ptr<MultiplexPacket> packet,
88 const base::Closure& done_task); 88 const base::Closure& done_task);
89 int DoRead(net::IOBuffer* buffer, int buffer_len); 89 int DoRead(net::IOBuffer* buffer, int buffer_len);
90 90
91 private: 91 private:
92 ChannelMultiplexer* multiplexer_; 92 ChannelMultiplexer* multiplexer_;
93 std::string name_; 93 std::string name_;
94 int send_id_; 94 int send_id_;
95 bool id_sent_; 95 bool id_sent_;
96 int receive_id_; 96 int receive_id_;
97 MuxSocket* socket_; 97 MuxSocket* socket_;
98 std::list<PendingPacket*> pending_packets_; 98 std::list<PendingPacket*> pending_packets_;
99 99
100 DISALLOW_COPY_AND_ASSIGN(MuxChannel); 100 DISALLOW_COPY_AND_ASSIGN(MuxChannel);
101 }; 101 };
102 102
103 class ChannelMultiplexer::MuxSocket : public net::StreamSocket, 103 class ChannelMultiplexer::MuxSocket : public P2PStreamSocket,
104 public base::NonThreadSafe, 104 public base::NonThreadSafe,
105 public base::SupportsWeakPtr<MuxSocket> { 105 public base::SupportsWeakPtr<MuxSocket> {
106 public: 106 public:
107 MuxSocket(MuxChannel* channel); 107 MuxSocket(MuxChannel* channel);
108 ~MuxSocket() override; 108 ~MuxSocket() override;
109 109
110 void OnWriteComplete(); 110 void OnWriteComplete();
111 void OnBaseChannelError(int error); 111 void OnBaseChannelError(int error);
112 void OnPacketReceived(); 112 void OnPacketReceived();
113 113
114 // net::StreamSocket interface. 114 // P2PStreamSocket interface.
115 int Read(net::IOBuffer* buffer, 115 int Read(net::IOBuffer* buffer,
116 int buffer_len, 116 int buffer_len,
117 const net::CompletionCallback& callback) override; 117 const net::CompletionCallback& callback) override;
118 int Write(net::IOBuffer* buffer, 118 int Write(net::IOBuffer* buffer,
119 int buffer_len, 119 int buffer_len,
120 const net::CompletionCallback& callback) override; 120 const net::CompletionCallback& callback) override;
121 121
122 int SetReceiveBufferSize(int32 size) override { 122 int SetReceiveBufferSize(int32 size) override {
Wez 2015/06/22 15:49:07 We wouldn't need these if we didn't derive from ne
Sergey Ulanov 2015/07/10 00:49:54 Done.
123 NOTIMPLEMENTED(); 123 NOTIMPLEMENTED();
124 return net::ERR_NOT_IMPLEMENTED; 124 return net::ERR_NOT_IMPLEMENTED;
125 } 125 }
126 int SetSendBufferSize(int32 size) override { 126 int SetSendBufferSize(int32 size) override {
127 NOTIMPLEMENTED(); 127 NOTIMPLEMENTED();
128 return net::ERR_NOT_IMPLEMENTED; 128 return net::ERR_NOT_IMPLEMENTED;
129 } 129 }
130 130
131 int Connect(const net::CompletionCallback& callback) override {
132 NOTIMPLEMENTED();
133 return net::ERR_NOT_IMPLEMENTED;
134 }
135 void Disconnect() override { NOTIMPLEMENTED(); }
136 bool IsConnected() const override {
137 NOTIMPLEMENTED();
138 return true;
139 }
140 bool IsConnectedAndIdle() const override {
141 NOTIMPLEMENTED();
142 return false;
143 }
144 int GetPeerAddress(net::IPEndPoint* address) const override {
145 NOTIMPLEMENTED();
146 return net::ERR_NOT_IMPLEMENTED;
147 }
148 int GetLocalAddress(net::IPEndPoint* address) const override {
149 NOTIMPLEMENTED();
150 return net::ERR_NOT_IMPLEMENTED;
151 }
152 const net::BoundNetLog& NetLog() const override {
153 NOTIMPLEMENTED();
154 return net_log_;
155 }
156 void SetSubresourceSpeculation() override { NOTIMPLEMENTED(); }
157 void SetOmniboxSpeculation() override { NOTIMPLEMENTED(); }
158 bool WasEverUsed() const override { return true; }
159 bool UsingTCPFastOpen() const override { return false; }
160 bool WasNpnNegotiated() const override { return false; }
161 net::NextProto GetNegotiatedProtocol() const override {
162 return net::kProtoUnknown;
163 }
164 bool GetSSLInfo(net::SSLInfo* ssl_info) override {
165 NOTIMPLEMENTED();
166 return false;
167 }
168 void GetConnectionAttempts(net::ConnectionAttempts* out) const override {
169 out->clear();
170 }
171 void ClearConnectionAttempts() override {}
172 void AddConnectionAttempts(const net::ConnectionAttempts& attempts) override {
173 }
174
175 private: 131 private:
176 MuxChannel* channel_; 132 MuxChannel* channel_;
177 133
178 int base_channel_error_ = net::OK; 134 int base_channel_error_ = net::OK;
179 135
180 net::CompletionCallback read_callback_; 136 net::CompletionCallback read_callback_;
181 scoped_refptr<net::IOBuffer> read_buffer_; 137 scoped_refptr<net::IOBuffer> read_buffer_;
182 int read_buffer_size_; 138 int read_buffer_size_;
183 139
184 bool write_pending_; 140 bool write_pending_;
185 int write_result_; 141 int write_result_;
186 net::CompletionCallback write_callback_; 142 net::CompletionCallback write_callback_;
187 143
188 net::BoundNetLog net_log_;
189
190 DISALLOW_COPY_AND_ASSIGN(MuxSocket); 144 DISALLOW_COPY_AND_ASSIGN(MuxSocket);
191 }; 145 };
192 146
193 147
194 ChannelMultiplexer::MuxChannel::MuxChannel( 148 ChannelMultiplexer::MuxChannel::MuxChannel(
195 ChannelMultiplexer* multiplexer, 149 ChannelMultiplexer* multiplexer,
196 const std::string& name, 150 const std::string& name,
197 int send_id) 151 int send_id)
198 : multiplexer_(multiplexer), 152 : multiplexer_(multiplexer),
199 name_(name), 153 name_(name),
200 send_id_(send_id), 154 send_id_(send_id),
201 id_sent_(false), 155 id_sent_(false),
202 receive_id_(kChannelIdUnknown), 156 receive_id_(kChannelIdUnknown),
203 socket_(nullptr) { 157 socket_(nullptr) {
204 } 158 }
205 159
206 ChannelMultiplexer::MuxChannel::~MuxChannel() { 160 ChannelMultiplexer::MuxChannel::~MuxChannel() {
207 // Socket must be destroyed before the channel. 161 // Socket must be destroyed before the channel.
208 DCHECK(!socket_); 162 DCHECK(!socket_);
209 STLDeleteElements(&pending_packets_); 163 STLDeleteElements(&pending_packets_);
210 } 164 }
211 165
212 scoped_ptr<net::StreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { 166 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() {
213 DCHECK(!socket_); // Can't create more than one socket per channel. 167 DCHECK(!socket_); // Can't create more than one socket per channel.
214 scoped_ptr<MuxSocket> result(new MuxSocket(this)); 168 scoped_ptr<MuxSocket> result(new MuxSocket(this));
215 socket_ = result.get(); 169 socket_ = result.get();
216 return result.Pass(); 170 return result.Pass();
217 } 171 }
218 172
219 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( 173 void ChannelMultiplexer::MuxChannel::OnIncomingPacket(
220 scoped_ptr<MultiplexPacket> packet, 174 scoped_ptr<MultiplexPacket> packet,
221 const base::Closure& done_task) { 175 const base::Closure& done_task) {
222 DCHECK_EQ(packet->channel_id(), receive_id_); 176 DCHECK_EQ(packet->channel_id(), receive_id_);
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
413 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); 367 for (std::list<PendingChannel>::iterator it = pending_channels_.begin();
414 it != pending_channels_.end(); ++it) { 368 it != pending_channels_.end(); ++it) {
415 if (it->name == name) { 369 if (it->name == name) {
416 pending_channels_.erase(it); 370 pending_channels_.erase(it);
417 return; 371 return;
418 } 372 }
419 } 373 }
420 } 374 }
421 375
422 void ChannelMultiplexer::OnBaseChannelReady( 376 void ChannelMultiplexer::OnBaseChannelReady(
423 scoped_ptr<net::StreamSocket> socket) { 377 scoped_ptr<P2PStreamSocket> socket) {
424 base_channel_factory_ = nullptr; 378 base_channel_factory_ = nullptr;
425 base_channel_ = socket.Pass(); 379 base_channel_ = socket.Pass();
426 380
427 if (base_channel_.get()) { 381 if (base_channel_.get()) {
428 // Initialize reader and writer. 382 // Initialize reader and writer.
429 reader_.StartReading(base_channel_.get(), 383 reader_.StartReading(base_channel_.get(),
430 base::Bind(&ChannelMultiplexer::OnBaseChannelError, 384 base::Bind(&ChannelMultiplexer::OnBaseChannelError,
431 base::Unretained(this))); 385 base::Unretained(this)));
432 writer_.Init(base_channel_.get(), 386 writer_.Init(base_channel_.get(),
433 base::Bind(&ChannelMultiplexer::OnBaseChannelError, 387 base::Bind(&ChannelMultiplexer::OnBaseChannelError,
(...skipping 10 matching lines...) Expand all
444 // Every time this function is called it connects a single channel and posts a 398 // Every time this function is called it connects a single channel and posts a
445 // separate task to connect other channels. This is necessary because the 399 // separate task to connect other channels. This is necessary because the
446 // callback may destroy the multiplexer or somehow else modify 400 // callback may destroy the multiplexer or somehow else modify
447 // |pending_channels_| list (e.g. call CancelChannelCreation()). 401 // |pending_channels_| list (e.g. call CancelChannelCreation()).
448 base::ThreadTaskRunnerHandle::Get()->PostTask( 402 base::ThreadTaskRunnerHandle::Get()->PostTask(
449 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, 403 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels,
450 weak_factory_.GetWeakPtr())); 404 weak_factory_.GetWeakPtr()));
451 405
452 PendingChannel c = pending_channels_.front(); 406 PendingChannel c = pending_channels_.front();
453 pending_channels_.erase(pending_channels_.begin()); 407 pending_channels_.erase(pending_channels_.begin());
454 scoped_ptr<net::StreamSocket> socket; 408 scoped_ptr<P2PStreamSocket> socket;
455 if (base_channel_.get()) 409 if (base_channel_.get())
456 socket = GetOrCreateChannel(c.name)->CreateSocket(); 410 socket = GetOrCreateChannel(c.name)->CreateSocket();
457 c.callback.Run(socket.Pass()); 411 c.callback.Run(socket.Pass());
458 } 412 }
459 413
460 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( 414 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel(
461 const std::string& name) { 415 const std::string& name) {
462 // Check if we already have a channel with the requested name. 416 // Check if we already have a channel with the requested name.
463 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); 417 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name);
464 if (it != channels_.end()) 418 if (it != channels_.end())
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
520 channel->OnIncomingPacket(packet.Pass(), done_task); 474 channel->OnIncomingPacket(packet.Pass(), done_task);
521 } 475 }
522 476
523 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, 477 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet,
524 const base::Closure& done_task) { 478 const base::Closure& done_task) {
525 return writer_.Write(SerializeAndFrameMessage(*packet), done_task); 479 return writer_.Write(SerializeAndFrameMessage(*packet), done_task);
526 } 480 }
527 481
528 } // namespace protocol 482 } // namespace protocol
529 } // namespace remoting 483 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698