OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/protocol/channel_multiplexer.h" | 5 #include "remoting/protocol/channel_multiplexer.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <string.h> | 8 #include <string.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 12 matching lines...) Expand all Loading... |
23 | 23 |
24 namespace remoting { | 24 namespace remoting { |
25 namespace protocol { | 25 namespace protocol { |
26 | 26 |
27 namespace { | 27 namespace { |
28 const int kChannelIdUnknown = -1; | 28 const int kChannelIdUnknown = -1; |
29 const int kMaxPacketSize = 1024; | 29 const int kMaxPacketSize = 1024; |
30 | 30 |
31 class PendingPacket { | 31 class PendingPacket { |
32 public: | 32 public: |
33 PendingPacket(scoped_ptr<MultiplexPacket> packet) | 33 PendingPacket(std::unique_ptr<MultiplexPacket> packet) |
34 : packet(std::move(packet)) {} | 34 : packet(std::move(packet)) {} |
35 ~PendingPacket() {} | 35 ~PendingPacket() {} |
36 | 36 |
37 bool is_empty() { return pos >= packet->data().size(); } | 37 bool is_empty() { return pos >= packet->data().size(); } |
38 | 38 |
39 int Read(char* buffer, size_t size) { | 39 int Read(char* buffer, size_t size) { |
40 size = std::min(size, packet->data().size() - pos); | 40 size = std::min(size, packet->data().size() - pos); |
41 memcpy(buffer, packet->data().data() + pos, size); | 41 memcpy(buffer, packet->data().data() + pos, size); |
42 pos += size; | 42 pos += size; |
43 return size; | 43 return size; |
44 } | 44 } |
45 | 45 |
46 private: | 46 private: |
47 scoped_ptr<MultiplexPacket> packet; | 47 std::unique_ptr<MultiplexPacket> packet; |
48 size_t pos = 0U; | 48 size_t pos = 0U; |
49 | 49 |
50 DISALLOW_COPY_AND_ASSIGN(PendingPacket); | 50 DISALLOW_COPY_AND_ASSIGN(PendingPacket); |
51 }; | 51 }; |
52 | 52 |
53 } // namespace | 53 } // namespace |
54 | 54 |
55 const char ChannelMultiplexer::kMuxChannelName[] = "mux"; | 55 const char ChannelMultiplexer::kMuxChannelName[] = "mux"; |
56 | 56 |
57 struct ChannelMultiplexer::PendingChannel { | 57 struct ChannelMultiplexer::PendingChannel { |
58 PendingChannel(const std::string& name, | 58 PendingChannel(const std::string& name, |
59 const ChannelCreatedCallback& callback) | 59 const ChannelCreatedCallback& callback) |
60 : name(name), callback(callback) { | 60 : name(name), callback(callback) { |
61 } | 61 } |
62 std::string name; | 62 std::string name; |
63 ChannelCreatedCallback callback; | 63 ChannelCreatedCallback callback; |
64 }; | 64 }; |
65 | 65 |
66 class ChannelMultiplexer::MuxChannel { | 66 class ChannelMultiplexer::MuxChannel { |
67 public: | 67 public: |
68 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, | 68 MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name, |
69 int send_id); | 69 int send_id); |
70 ~MuxChannel(); | 70 ~MuxChannel(); |
71 | 71 |
72 const std::string& name() { return name_; } | 72 const std::string& name() { return name_; } |
73 int receive_id() { return receive_id_; } | 73 int receive_id() { return receive_id_; } |
74 void set_receive_id(int id) { receive_id_ = id; } | 74 void set_receive_id(int id) { receive_id_ = id; } |
75 | 75 |
76 // Called by ChannelMultiplexer. | 76 // Called by ChannelMultiplexer. |
77 scoped_ptr<P2PStreamSocket> CreateSocket(); | 77 std::unique_ptr<P2PStreamSocket> CreateSocket(); |
78 void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet); | 78 void OnIncomingPacket(std::unique_ptr<MultiplexPacket> packet); |
79 void OnBaseChannelError(int error); | 79 void OnBaseChannelError(int error); |
80 | 80 |
81 // Called by MuxSocket. | 81 // Called by MuxSocket. |
82 void OnSocketDestroyed(); | 82 void OnSocketDestroyed(); |
83 void DoWrite(scoped_ptr<MultiplexPacket> packet, | 83 void DoWrite(std::unique_ptr<MultiplexPacket> packet, |
84 const base::Closure& done_task); | 84 const base::Closure& done_task); |
85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); | 85 int DoRead(const scoped_refptr<net::IOBuffer>& buffer, int buffer_len); |
86 | 86 |
87 private: | 87 private: |
88 ChannelMultiplexer* multiplexer_; | 88 ChannelMultiplexer* multiplexer_; |
89 std::string name_; | 89 std::string name_; |
90 int send_id_; | 90 int send_id_; |
91 bool id_sent_; | 91 bool id_sent_; |
92 int receive_id_; | 92 int receive_id_; |
93 MuxSocket* socket_; | 93 MuxSocket* socket_; |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
141 receive_id_(kChannelIdUnknown), | 141 receive_id_(kChannelIdUnknown), |
142 socket_(nullptr) { | 142 socket_(nullptr) { |
143 } | 143 } |
144 | 144 |
145 ChannelMultiplexer::MuxChannel::~MuxChannel() { | 145 ChannelMultiplexer::MuxChannel::~MuxChannel() { |
146 // Socket must be destroyed before the channel. | 146 // Socket must be destroyed before the channel. |
147 DCHECK(!socket_); | 147 DCHECK(!socket_); |
148 STLDeleteElements(&pending_packets_); | 148 STLDeleteElements(&pending_packets_); |
149 } | 149 } |
150 | 150 |
151 scoped_ptr<P2PStreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() { | 151 std::unique_ptr<P2PStreamSocket> |
| 152 ChannelMultiplexer::MuxChannel::CreateSocket() { |
152 DCHECK(!socket_); // Can't create more than one socket per channel. | 153 DCHECK(!socket_); // Can't create more than one socket per channel. |
153 scoped_ptr<MuxSocket> result(new MuxSocket(this)); | 154 std::unique_ptr<MuxSocket> result(new MuxSocket(this)); |
154 socket_ = result.get(); | 155 socket_ = result.get(); |
155 return std::move(result); | 156 return std::move(result); |
156 } | 157 } |
157 | 158 |
158 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( | 159 void ChannelMultiplexer::MuxChannel::OnIncomingPacket( |
159 scoped_ptr<MultiplexPacket> packet) { | 160 std::unique_ptr<MultiplexPacket> packet) { |
160 DCHECK_EQ(packet->channel_id(), receive_id_); | 161 DCHECK_EQ(packet->channel_id(), receive_id_); |
161 if (packet->data().size() > 0) { | 162 if (packet->data().size() > 0) { |
162 pending_packets_.push_back(new PendingPacket(std::move(packet))); | 163 pending_packets_.push_back(new PendingPacket(std::move(packet))); |
163 if (socket_) { | 164 if (socket_) { |
164 // Notify the socket that we have more data. | 165 // Notify the socket that we have more data. |
165 socket_->OnPacketReceived(); | 166 socket_->OnPacketReceived(); |
166 } | 167 } |
167 } | 168 } |
168 } | 169 } |
169 | 170 |
170 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { | 171 void ChannelMultiplexer::MuxChannel::OnBaseChannelError(int error) { |
171 if (socket_) | 172 if (socket_) |
172 socket_->OnBaseChannelError(error); | 173 socket_->OnBaseChannelError(error); |
173 } | 174 } |
174 | 175 |
175 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { | 176 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() { |
176 DCHECK(socket_); | 177 DCHECK(socket_); |
177 socket_ = nullptr; | 178 socket_ = nullptr; |
178 } | 179 } |
179 | 180 |
180 void ChannelMultiplexer::MuxChannel::DoWrite( | 181 void ChannelMultiplexer::MuxChannel::DoWrite( |
181 scoped_ptr<MultiplexPacket> packet, | 182 std::unique_ptr<MultiplexPacket> packet, |
182 const base::Closure& done_task) { | 183 const base::Closure& done_task) { |
183 packet->set_channel_id(send_id_); | 184 packet->set_channel_id(send_id_); |
184 if (!id_sent_) { | 185 if (!id_sent_) { |
185 packet->set_channel_name(name_); | 186 packet->set_channel_name(name_); |
186 id_sent_ = true; | 187 id_sent_ = true; |
187 } | 188 } |
188 multiplexer_->DoWrite(std::move(packet), done_task); | 189 multiplexer_->DoWrite(std::move(packet), done_task); |
189 } | 190 } |
190 | 191 |
191 int ChannelMultiplexer::MuxChannel::DoRead( | 192 int ChannelMultiplexer::MuxChannel::DoRead( |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
239 | 240 |
240 int ChannelMultiplexer::MuxSocket::Write( | 241 int ChannelMultiplexer::MuxSocket::Write( |
241 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, | 242 const scoped_refptr<net::IOBuffer>& buffer, int buffer_len, |
242 const net::CompletionCallback& callback) { | 243 const net::CompletionCallback& callback) { |
243 DCHECK(CalledOnValidThread()); | 244 DCHECK(CalledOnValidThread()); |
244 DCHECK(write_callback_.is_null()); | 245 DCHECK(write_callback_.is_null()); |
245 | 246 |
246 if (base_channel_error_ != net::OK) | 247 if (base_channel_error_ != net::OK) |
247 return base_channel_error_; | 248 return base_channel_error_; |
248 | 249 |
249 scoped_ptr<MultiplexPacket> packet(new MultiplexPacket()); | 250 std::unique_ptr<MultiplexPacket> packet(new MultiplexPacket()); |
250 size_t size = std::min(kMaxPacketSize, buffer_len); | 251 size_t size = std::min(kMaxPacketSize, buffer_len); |
251 packet->mutable_data()->assign(buffer->data(), size); | 252 packet->mutable_data()->assign(buffer->data(), size); |
252 | 253 |
253 write_pending_ = true; | 254 write_pending_ = true; |
254 channel_->DoWrite(std::move(packet), base::Bind( | 255 channel_->DoWrite(std::move(packet), base::Bind( |
255 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); | 256 &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr())); |
256 | 257 |
257 // OnWriteComplete() might be called above synchronously. | 258 // OnWriteComplete() might be called above synchronously. |
258 if (write_pending_) { | 259 if (write_pending_) { |
259 DCHECK(write_callback_.is_null()); | 260 DCHECK(write_callback_.is_null()); |
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
343 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); | 344 for (std::list<PendingChannel>::iterator it = pending_channels_.begin(); |
344 it != pending_channels_.end(); ++it) { | 345 it != pending_channels_.end(); ++it) { |
345 if (it->name == name) { | 346 if (it->name == name) { |
346 pending_channels_.erase(it); | 347 pending_channels_.erase(it); |
347 return; | 348 return; |
348 } | 349 } |
349 } | 350 } |
350 } | 351 } |
351 | 352 |
352 void ChannelMultiplexer::OnBaseChannelReady( | 353 void ChannelMultiplexer::OnBaseChannelReady( |
353 scoped_ptr<P2PStreamSocket> socket) { | 354 std::unique_ptr<P2PStreamSocket> socket) { |
354 base_channel_factory_ = nullptr; | 355 base_channel_factory_ = nullptr; |
355 base_channel_ = std::move(socket); | 356 base_channel_ = std::move(socket); |
356 | 357 |
357 if (base_channel_.get()) { | 358 if (base_channel_.get()) { |
358 // Initialize reader and writer. | 359 // Initialize reader and writer. |
359 reader_.StartReading(base_channel_.get(), | 360 reader_.StartReading(base_channel_.get(), |
360 base::Bind(&ChannelMultiplexer::OnIncomingPacket, | 361 base::Bind(&ChannelMultiplexer::OnIncomingPacket, |
361 base::Unretained(this)), | 362 base::Unretained(this)), |
362 base::Bind(&ChannelMultiplexer::OnBaseChannelError, | 363 base::Bind(&ChannelMultiplexer::OnBaseChannelError, |
363 base::Unretained(this))); | 364 base::Unretained(this))); |
(...skipping 13 matching lines...) Expand all Loading... |
377 // Every time this function is called it connects a single channel and posts a | 378 // Every time this function is called it connects a single channel and posts a |
378 // separate task to connect other channels. This is necessary because the | 379 // separate task to connect other channels. This is necessary because the |
379 // callback may destroy the multiplexer or somehow else modify | 380 // callback may destroy the multiplexer or somehow else modify |
380 // |pending_channels_| list (e.g. call CancelChannelCreation()). | 381 // |pending_channels_| list (e.g. call CancelChannelCreation()). |
381 base::ThreadTaskRunnerHandle::Get()->PostTask( | 382 base::ThreadTaskRunnerHandle::Get()->PostTask( |
382 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, | 383 FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels, |
383 weak_factory_.GetWeakPtr())); | 384 weak_factory_.GetWeakPtr())); |
384 | 385 |
385 PendingChannel c = pending_channels_.front(); | 386 PendingChannel c = pending_channels_.front(); |
386 pending_channels_.erase(pending_channels_.begin()); | 387 pending_channels_.erase(pending_channels_.begin()); |
387 scoped_ptr<P2PStreamSocket> socket; | 388 std::unique_ptr<P2PStreamSocket> socket; |
388 if (base_channel_.get()) | 389 if (base_channel_.get()) |
389 socket = GetOrCreateChannel(c.name)->CreateSocket(); | 390 socket = GetOrCreateChannel(c.name)->CreateSocket(); |
390 c.callback.Run(std::move(socket)); | 391 c.callback.Run(std::move(socket)); |
391 } | 392 } |
392 | 393 |
393 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( | 394 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel( |
394 const std::string& name) { | 395 const std::string& name) { |
395 // Check if we already have a channel with the requested name. | 396 // Check if we already have a channel with the requested name. |
396 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 397 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
397 if (it != channels_.end()) | 398 if (it != channels_.end()) |
(...skipping 17 matching lines...) Expand all Loading... |
415 } | 416 } |
416 } | 417 } |
417 | 418 |
418 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, | 419 void ChannelMultiplexer::NotifyBaseChannelError(const std::string& name, |
419 int error) { | 420 int error) { |
420 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); | 421 std::map<std::string, MuxChannel*>::iterator it = channels_.find(name); |
421 if (it != channels_.end()) | 422 if (it != channels_.end()) |
422 it->second->OnBaseChannelError(error); | 423 it->second->OnBaseChannelError(error); |
423 } | 424 } |
424 | 425 |
425 void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<CompoundBuffer> buffer) { | 426 void ChannelMultiplexer::OnIncomingPacket( |
426 scoped_ptr<MultiplexPacket> packet = | 427 std::unique_ptr<CompoundBuffer> buffer) { |
| 428 std::unique_ptr<MultiplexPacket> packet = |
427 ParseMessage<MultiplexPacket>(buffer.get()); | 429 ParseMessage<MultiplexPacket>(buffer.get()); |
428 if (!packet) | 430 if (!packet) |
429 return; | 431 return; |
430 | 432 |
431 DCHECK(packet->has_channel_id()); | 433 DCHECK(packet->has_channel_id()); |
432 if (!packet->has_channel_id()) { | 434 if (!packet->has_channel_id()) { |
433 LOG(ERROR) << "Received packet without channel_id."; | 435 LOG(ERROR) << "Received packet without channel_id."; |
434 return; | 436 return; |
435 } | 437 } |
436 | 438 |
(...skipping 11 matching lines...) Expand all Loading... |
448 return; | 450 return; |
449 } | 451 } |
450 channel = GetOrCreateChannel(packet->channel_name()); | 452 channel = GetOrCreateChannel(packet->channel_name()); |
451 channel->set_receive_id(receive_id); | 453 channel->set_receive_id(receive_id); |
452 channels_by_receive_id_[receive_id] = channel; | 454 channels_by_receive_id_[receive_id] = channel; |
453 } | 455 } |
454 | 456 |
455 channel->OnIncomingPacket(std::move(packet)); | 457 channel->OnIncomingPacket(std::move(packet)); |
456 } | 458 } |
457 | 459 |
458 void ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet, | 460 void ChannelMultiplexer::DoWrite(std::unique_ptr<MultiplexPacket> packet, |
459 const base::Closure& done_task) { | 461 const base::Closure& done_task) { |
460 writer_.Write(SerializeAndFrameMessage(*packet), done_task); | 462 writer_.Write(SerializeAndFrameMessage(*packet), done_task); |
461 } | 463 } |
462 | 464 |
463 } // namespace protocol | 465 } // namespace protocol |
464 } // namespace remoting | 466 } // namespace remoting |
OLD | NEW |