Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3039)

Side by Side Diff: remoting/protocol/channel_multiplexer.cc

Issue 1864213002: Convert //remoting to use std::unique_ptr (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Mac IWYU Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/channel_multiplexer.h" 5 #include "remoting/protocol/channel_multiplexer.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <string.h> 8 #include <string.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 12 matching lines...) Expand all
23 23
24 namespace remoting { 24 namespace remoting {
25 namespace protocol { 25 namespace protocol {
26 26
27 namespace { 27 namespace {
28 const int kChannelIdUnknown = -1; 28 const int kChannelIdUnknown = -1;
29 const int kMaxPacketSize = 1024; 29 const int kMaxPacketSize = 1024;
30 30
31 class PendingPacket { 31 class PendingPacket {
32 public: 32 public:
33 PendingPacket(scoped_ptr<MultiplexPacket> packet) 33 PendingPacket(std::unique_ptr<MultiplexPacket> packet)
34 : packet(std::move(packet)) {} 34 : packet(std::move(packet)) {}
35 ~PendingPacket() {} 35 ~PendingPacket() {}
36 36
37 bool is_empty() { return pos >= packet->data().size(); } 37 bool is_empty() { return pos >= packet->data().size(); }
38 38
39 int Read(char* buffer, size_t size) { 39 int Read(char* buffer, size_t size) {
40 size = std::min(size, packet->data().size() - pos); 40 size = std::min(size, packet->data().size() - pos);
41 memcpy(buffer, packet->data().data() + pos, size); 41 memcpy(buffer, packet->data().data() + pos, size);
42 pos += size; 42 pos += size;
43 return size; 43 return size;
44 } 44 }
45 45
46 private: 46 private:
47 scoped_ptr<MultiplexPacket> packet; 47 std::unique_ptr<MultiplexPacket> packet;
48 size_t pos = 0U; 48 size_t pos = 0U;
49 49
50 DISALLOW_COPY_AND_ASSIGN(PendingPacket); 50 DISALLOW_COPY_AND_ASSIGN(PendingPacket);
51 }; 51 };
52 52
53 } // namespace 53 } // namespace
54 54
55 const char ChannelMultiplexer::kMuxChannelName[] = "mux"; 55 const char ChannelMultiplexer::kMuxChannelName[] = "mux";
56 56
57 struct ChannelMultiplexer::PendingChannel { 57 struct ChannelMultiplexer::PendingChannel {
58 PendingChannel(const std::string& name, 58 PendingChannel(const std::string& name,
59 const ChannelCreatedCallback& callback) 59 const ChannelCreatedCallback& callback)
60 : name(name), callback(callback) { 60 : name(name), callback(callback) {
61 } 61 }
62 std::string name; 62 std::string name;
63 ChannelCreatedCallback callback; 63 ChannelCreatedCallback callback;
64 }; 64 };
65 65
66 class ChannelMultiplexer::MuxChannel { 66 class ChannelMultiplexer::MuxChannel {
67 public: 67 public:
68 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, 68 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name,
69 int send_id); 69 int send_id);
70 ~MuxChannel(); 70 ~MuxChannel();
71 71
72 const std::string& name() { return name_; } 72 const std::string& name() { return name_; }
73 int receive_id() { return receive_id_; } 73 int receive_id() { return receive_id_; }
74 void set_receive_id(int id) { receive_id_ = id; } 74 void set_receive_id(int id) { receive_id_ = id; }
75 75
76 // Called by ChannelMultiplexer. 76 // Called by ChannelMultiplexer.
77 scoped_ptr<P2PStreamSocket> CreateSocket(); 77 std::unique_ptr<P2PStreamSocket> CreateSocket();
78 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet); 78 void OnIncomingPacket(std::unique_ptr<MultiplexPacket> packet);
79 void OnBaseChannelError(int error); 79 void OnBaseChannelError(int error);
80 80
81 // Called by MuxSocket. 81 // Called by MuxSocket.
82 void OnSocketDestroyed(); 82 void OnSocketDestroyed();
83 void DoWrite(scoped_ptr<MultiplexPacket> packet, 83 void DoWrite(std::unique_ptr<MultiplexPacket> packet,
84 const base::Closure& done_task); 84 const base::Closure& done_task);
85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); 85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len);
86 86
87 private: 87 private:
88 ChannelMultiplexer* multiplexer_; 88 ChannelMultiplexer* multiplexer_;
89 std::string name_; 89 std::string name_;
90 int send_id_; 90 int send_id_;
91 bool id_sent_; 91 bool id_sent_;
92 int receive_id_; 92 int receive_id_;
93 MuxSocket* socket_; 93 MuxSocket* socket_;
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
141 receive_id_(kChannelIdUnknown), 141 receive_id_(kChannelIdUnknown),
142 socket_(nullptr) { 142 socket_(nullptr) {
143 } 143 }
144 144
145 ChannelMultiplexer::MuxChannel::~MuxChannel() { 145 ChannelMultiplexer::MuxChannel::~MuxChannel() {
146 // Socket must be destroyed before the channel. 146 // Socket must be destroyed before the channel.
147 DCHECK(!socket_); 147 DCHECK(!socket_);
148 STLDeleteElements(&pending_packets_); 148 STLDeleteElements(&pending_packets_);
149 } 149 }
150 150
151 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { 151 std::unique_ptr<P2PStreamSocket>
152 ChannelMultiplexer::MuxChannel::CreateSocket() {
152 DCHECK(!socket_); // Can't create more than one socket per channel. 153 DCHECK(!socket_); // Can't create more than one socket per channel.
153 scoped_ptr<MuxSocket> result(new MuxSocket(this)); 154 std::unique_ptr<MuxSocket> result(new MuxSocket(this));
154 socket_ = result.get(); 155 socket_ = result.get();
155 return std::move(result); 156 return std::move(result);
156 } 157 }
157 158
158 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( 159 void ChannelMultiplexer::MuxChannel::OnIncomingPacket(
159 scoped_ptr<MultiplexPacket> packet) { 160 std::unique_ptr<MultiplexPacket> packet) {
160 DCHECK_EQ(packet->channel_id(), receive_id_); 161 DCHECK_EQ(packet->channel_id(), receive_id_);
161 if (packet->data().size() > 0) { 162 if (packet->data().size() > 0) {
162 pending_packets_.push_back(new PendingPacket(std::move(packet))); 163 pending_packets_.push_back(new PendingPacket(std::move(packet)));
163 if (socket_) { 164 if (socket_) {
164 // Notify the socket that we have more data. 165 // Notify the socket that we have more data.
165 socket_->OnPacketReceived(); 166 socket_->OnPacketReceived();
166 } 167 }
167 } 168 }
168 } 169 }
169 170
170 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { 171 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) {
171 if (socket_) 172 if (socket_)
172 socket_->OnBaseChannelError(error); 173 socket_->OnBaseChannelError(error);
173 } 174 }
174 175
175 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { 176 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() {
176 DCHECK(socket_); 177 DCHECK(socket_);
177 socket_ = nullptr; 178 socket_ = nullptr;
178 } 179 }
179 180
180 void ChannelMultiplexer::MuxChannel::DoWrite( 181 void ChannelMultiplexer::MuxChannel::DoWrite(
181 scoped_ptr<MultiplexPacket> packet, 182 std::unique_ptr<MultiplexPacket> packet,
182 const base::Closure& done_task) { 183 const base::Closure& done_task) {
183 packet->set_channel_id(send_id_); 184 packet->set_channel_id(send_id_);
184 if (!id_sent_) { 185 if (!id_sent_) {
185 packet->set_channel_name(name_); 186 packet->set_channel_name(name_);
186 id_sent_ = true; 187 id_sent_ = true;
187 } 188 }
188 multiplexer_->DoWrite(std::move(packet), done_task); 189 multiplexer_->DoWrite(std::move(packet), done_task);
189 } 190 }
190 191
191 int ChannelMultiplexer::MuxChannel::DoRead( 192 int ChannelMultiplexer::MuxChannel::DoRead(
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 240
240 int ChannelMultiplexer::MuxSocket::Write( 241 int ChannelMultiplexer::MuxSocket::Write(
241 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, 242 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len,
242 const net::CompletionCallback& callback) { 243 const net::CompletionCallback& callback) {
243 DCHECK(CalledOnValidThread()); 244 DCHECK(CalledOnValidThread());
244 DCHECK(write_callback_.is_null()); 245 DCHECK(write_callback_.is_null());
245 246
246 if (base_channel_error_ != net::OK) 247 if (base_channel_error_ != net::OK)
247 return base_channel_error_; 248 return base_channel_error_;
248 249
249 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); 250 std::unique_ptr<MultiplexPacket> packet(new MultiplexPacket());
250 size_t size = std::min(kMaxPacketSize, buffer_len); 251 size_t size = std::min(kMaxPacketSize, buffer_len);
251 packet->mutable_data()->assign(buffer->data(), size); 252 packet->mutable_data()->assign(buffer->data(), size);
252 253
253 write_pending_ = true; 254 write_pending_ = true;
254 channel_->DoWrite(std::move(packet), base::Bind( 255 channel_->DoWrite(std::move(packet), base::Bind(
255 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); 256 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr()));
256 257
257 // OnWriteComplete() might be called above synchronously. 258 // OnWriteComplete() might be called above synchronously.
258 if (write_pending_) { 259 if (write_pending_) {
259 DCHECK(write_callback_.is_null()); 260 DCHECK(write_callback_.is_null());
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
343 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); 344 for (std::list<PendingChannel>::iterator it = pending_channels_.begin();
344 it != pending_channels_.end(); ++it) { 345 it != pending_channels_.end(); ++it) {
345 if (it->name == name) { 346 if (it->name == name) {
346 pending_channels_.erase(it); 347 pending_channels_.erase(it);
347 return; 348 return;
348 } 349 }
349 } 350 }
350 } 351 }
351 352
352 void ChannelMultiplexer::OnBaseChannelReady( 353 void ChannelMultiplexer::OnBaseChannelReady(
353 scoped_ptr<P2PStreamSocket> socket) { 354 std::unique_ptr<P2PStreamSocket> socket) {
354 base_channel_factory_ = nullptr; 355 base_channel_factory_ = nullptr;
355 base_channel_ = std::move(socket); 356 base_channel_ = std::move(socket);
356 357
357 if (base_channel_.get()) { 358 if (base_channel_.get()) {
358 // Initialize reader and writer. 359 // Initialize reader and writer.
359 reader_.StartReading(base_channel_.get(), 360 reader_.StartReading(base_channel_.get(),
360 base::Bind(&ChannelMultiplexer::OnIncomingPacket, 361 base::Bind(&ChannelMultiplexer::OnIncomingPacket,
361 base::Unretained(this)), 362 base::Unretained(this)),
362 base::Bind(&ChannelMultiplexer::OnBaseChannelError, 363 base::Bind(&ChannelMultiplexer::OnBaseChannelError,
363 base::Unretained(this))); 364 base::Unretained(this)));
(...skipping 13 matching lines...) Expand all
377 // Every time this function is called it connects a single channel and posts a 378 // Every time this function is called it connects a single channel and posts a
378 // separate task to connect other channels. This is necessary because the 379 // separate task to connect other channels. This is necessary because the
379 // callback may destroy the multiplexer or somehow else modify 380 // callback may destroy the multiplexer or somehow else modify
380 // |pending_channels_| list (e.g. call CancelChannelCreation()). 381 // |pending_channels_| list (e.g. call CancelChannelCreation()).
381 base::ThreadTaskRunnerHandle::Get()->PostTask( 382 base::ThreadTaskRunnerHandle::Get()->PostTask(
382 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, 383 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels,
383 weak_factory_.GetWeakPtr())); 384 weak_factory_.GetWeakPtr()));
384 385
385 PendingChannel c = pending_channels_.front(); 386 PendingChannel c = pending_channels_.front();
386 pending_channels_.erase(pending_channels_.begin()); 387 pending_channels_.erase(pending_channels_.begin());
387 scoped_ptr<P2PStreamSocket> socket; 388 std::unique_ptr<P2PStreamSocket> socket;
388 if (base_channel_.get()) 389 if (base_channel_.get())
389 socket = GetOrCreateChannel(c.name)->CreateSocket(); 390 socket = GetOrCreateChannel(c.name)->CreateSocket();
390 c.callback.Run(std::move(socket)); 391 c.callback.Run(std::move(socket));
391 } 392 }
392 393
393 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( 394 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel(
394 const std::string& name) { 395 const std::string& name) {
395 // Check if we already have a channel with the requested name. 396 // Check if we already have a channel with the requested name.
396 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); 397 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name);
397 if (it != channels_.end()) 398 if (it != channels_.end())
(...skipping 17 matching lines...) Expand all
415 } 416 }
416 } 417 }
417 418
418 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, 419 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name,
419 int error) { 420 int error) {
420 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); 421 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name);
421 if (it != channels_.end()) 422 if (it != channels_.end())
422 it->second->OnBaseChannelError(error); 423 it->second->OnBaseChannelError(error);
423 } 424 }
424 425
425 void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<CompoundBuffer> buffer) { 426 void ChannelMultiplexer::OnIncomingPacket(
426 scoped_ptr<MultiplexPacket> packet = 427 std::unique_ptr<CompoundBuffer> buffer) {
428 std::unique_ptr<MultiplexPacket> packet =
427 ParseMessage<MultiplexPacket>(buffer.get()); 429 ParseMessage<MultiplexPacket>(buffer.get());
428 if (!packet) 430 if (!packet)
429 return; 431 return;
430 432
431 DCHECK(packet->has_channel_id()); 433 DCHECK(packet->has_channel_id());
432 if (!packet->has_channel_id()) { 434 if (!packet->has_channel_id()) {
433 LOG(ERROR) << "Received packet without channel_id."; 435 LOG(ERROR) << "Received packet without channel_id.";
434 return; 436 return;
435 } 437 }
436 438
(...skipping 11 matching lines...) Expand all
448 return; 450 return;
449 } 451 }
450 channel = GetOrCreateChannel(packet->channel_name()); 452 channel = GetOrCreateChannel(packet->channel_name());
451 channel->set_receive_id(receive_id); 453 channel->set_receive_id(receive_id);
452 channels_by_receive_id_[receive_id] = channel; 454 channels_by_receive_id_[receive_id] = channel;
453 } 455 }
454 456
455 channel->OnIncomingPacket(std::move(packet)); 457 channel->OnIncomingPacket(std::move(packet));
456 } 458 }
457 459
458 void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, 460 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet,
459 const base::Closure& done_task) { 461 const base::Closure& done_task) {
460 writer_.Write(SerializeAndFrameMessage(*packet), done_task); 462 writer_.Write(SerializeAndFrameMessage(*packet), done_task);
461 } 463 }
462 464
463 } // namespace protocol 465 } // namespace protocol
464 } // namespace remoting 466 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/channel_multiplexer.h ('k') | remoting/protocol/channel_multiplexer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698