Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(856)

Side by Side Diff: remoting/protocol/jingle_session.cc

Issue 2417913002: Process incoming IQs in the same order that they were sent. (Closed)
Patch Set: Reviewer's feedback Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/jingle_session.h" 5 #include "remoting/protocol/jingle_session.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <limits> 9 #include <limits>
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/rand_util.h" 13 #include "base/rand_util.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/stl_util.h" 15 #include "base/stl_util.h"
16 #include "base/strings/string_number_conversions.h" 16 #include "base/strings/string_number_conversions.h"
17 #include "base/threading/thread_task_runner_handle.h" 17 #include "base/threading/thread_task_runner_handle.h"
18 #include "base/time/time.h" 18 #include "base/time/time.h"
19 #include "remoting/base/constants.h" 19 #include "remoting/base/constants.h"
20 #include "remoting/protocol/authenticator.h" 20 #include "remoting/protocol/authenticator.h"
21 #include "remoting/protocol/content_description.h" 21 #include "remoting/protocol/content_description.h"
22 #include "remoting/protocol/jingle_messages.h" 22 #include "remoting/protocol/jingle_messages.h"
23 #include "remoting/protocol/jingle_session_manager.h" 23 #include "remoting/protocol/jingle_session_manager.h"
24 #include "remoting/protocol/session_config.h" 24 #include "remoting/protocol/session_config.h"
25 #include "remoting/protocol/transport.h" 25 #include "remoting/protocol/transport.h"
26 #include "remoting/signaling/iq_sender.h" 26 #include "remoting/signaling/iq_sender.h"
27 #include "third_party/webrtc/libjingle/xmllite/xmlelement.h" 27 #include "third_party/webrtc/libjingle/xmllite/xmlelement.h"
28 #include "third_party/webrtc/libjingle/xmpp/constants.h"
28 #include "third_party/webrtc/p2p/base/candidate.h" 29 #include "third_party/webrtc/p2p/base/candidate.h"
29 30
30 using buzz::XmlElement; 31 using buzz::XmlElement;
31 32
32 namespace remoting { 33 namespace remoting {
33 namespace protocol { 34 namespace protocol {
34 35
35 namespace { 36 namespace {
36 37
37 // How long we should wait for a response from the other end. This value is used 38 // How long we should wait for a response from the other end. This value is used
38 // for all requests except |transport-info|. 39 // for all requests except |transport-info|.
39 const int kDefaultMessageTimeout = 10; 40 const int kDefaultMessageTimeout = 10;
40 41
41 // During a reconnection, it usually takes longer for the peer to respond due to 42 // During a reconnection, it usually takes longer for the peer to respond due to
42 // pending messages in the channel from the previous session. From experiment, 43 // pending messages in the channel from the previous session. From experiment,
43 // it can take up to 20s for the session to reconnect. To make it safe, setting 44 // it can take up to 20s for the session to reconnect. To make it safe, setting
44 // the timeout to 30s. 45 // the timeout to 30s.
45 const int kSessionInitiateAndAcceptTimeout = kDefaultMessageTimeout * 3; 46 const int kSessionInitiateAndAcceptTimeout = kDefaultMessageTimeout * 3;
46 47
47 // Timeout for the transport-info messages. 48 // Timeout for the transport-info messages.
48 const int kTransportInfoTimeout = 10 * 60; 49 const int kTransportInfoTimeout = 10 * 60;
49 50
51 // Special value for an invalid sequential ID for an incoming IQ.
52 const int kInvalid = -1;
53
54 // Special value indicating that any sequential ID is valid for the next
55 // incoming IQ.
56 const int kAny = -1;
57
50 ErrorCode AuthRejectionReasonToErrorCode( 58 ErrorCode AuthRejectionReasonToErrorCode(
51 Authenticator::RejectionReason reason) { 59 Authenticator::RejectionReason reason) {
52 switch (reason) { 60 switch (reason) {
53 case Authenticator::INVALID_CREDENTIALS: 61 case Authenticator::INVALID_CREDENTIALS:
54 return AUTHENTICATION_FAILED; 62 return AUTHENTICATION_FAILED;
55 case Authenticator::PROTOCOL_ERROR: 63 case Authenticator::PROTOCOL_ERROR:
56 return INCOMPATIBLE_PROTOCOL; 64 return INCOMPATIBLE_PROTOCOL;
57 case Authenticator::INVALID_ACCOUNT: 65 case Authenticator::INVALID_ACCOUNT:
58 return INVALID_ACCOUNT; 66 return INVALID_ACCOUNT;
59 case Authenticator::REJECTED_BY_USER: 67 case Authenticator::REJECTED_BY_USER:
60 return SESSION_REJECTED; 68 return SESSION_REJECTED;
61 } 69 }
62 NOTREACHED(); 70 NOTREACHED();
63 return UNKNOWN_ERROR; 71 return UNKNOWN_ERROR;
64 } 72 }
65 73
74 // Extracts a sequential id from the id attribute of the IQ stanza.
75 int GetSequentialId(std::string id) {
Sergey Ulanov 2016/10/19 21:23:38 please use const reference for string parameters
kelvinp 2016/10/21 00:26:02 Done.
76 int result = kInvalid;
Sergey Ulanov 2016/10/19 21:23:38 move this below, just before StringToInt() call. S
kelvinp 2016/10/21 00:26:02 Done.
77 std::vector<std::string> tokens =
78 SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY);
79 // Legacy endpoints does not encode the IQ ordering in the ID attribute
80 if (tokens.size() != 2) {
81 return kInvalid;
82 }
83
84 if (!base::StringToInt(tokens[1].c_str(), &result)) {
85 return kInvalid;
86 }
87 return result;
88 };
Sergey Ulanov 2016/10/19 21:23:38 nit: don't need ;
kelvinp 2016/10/21 00:26:02 Done.
89
66 } // namespace 90 } // namespace
67 91
92 // A Queue that sorts incoming messages and returns them in the ascending order
93 // of sequence ids. The sequence id can be extracted from the ID attribute of
94 // an IQ stanza, which have the following format <opaque_string>_<sequence_id>.
95 //
96 // Background:
97 // The chromoting signaling channel does not guarantee that the incoming IQs are
98 // delivered in the order that it is sent.
99 //
100 // This behavior leads to transient session setup failures. For instance,
101 // a <transport-info> that is sent after a <session-info> message is sometimes
102 // delivered to the client out of order, causing the client to close the
103 // session due to an unexpected request.
104 class JingleSession::OrderedMessageQueue {
105 public:
106 OrderedMessageQueue()
107 : next_incoming_(kAny),
108 next_outgoing_(0),
Sergey Ulanov 2016/10/19 21:23:38 Move these initializers to where they are defined.
kelvinp 2016/10/21 00:26:03 Done.
109 outgoing_prefix_(base::Uint64ToString(
110 base::RandGenerator(std::numeric_limits<uint64_t>::max()))){};
Sergey Ulanov 2016/10/19 21:23:38 base::RandUint64()
Sergey Ulanov 2016/10/19 21:23:38 don't need semicolon
kelvinp 2016/10/21 00:26:02 Done.
kelvinp 2016/10/21 00:26:03 Done.
111
112 ~OrderedMessageQueue() {}
113
114 // Returns the list of messages ordered by their sequential IDs.
115 std::list<std::unique_ptr<PendingMessage>> OnIncomingMessage(
Sergey Ulanov 2016/10/19 21:23:38 Better to use std::vector<> instead of std::list<>
kelvinp 2016/10/21 00:26:03 Done.
116 std::string id,
Sergey Ulanov 2016/10/19 21:23:38 const reference
kelvinp 2016/10/21 00:26:02 Done.
117 std::unique_ptr<PendingMessage>);
118 std::string GetNextOutgoingId();
119
120 private:
121 // Implements an ordered list by using map with the |sequence_id| as the key,
Sergey Ulanov 2016/10/19 21:23:39 I don't think you really need a map. A heap stored
kelvinp 2016/10/21 00:26:02 Looking at std::push_heap() this would require us
122 // so that |queue_| is always sorted by |sequence_id|.
123 std::map<int, std::unique_ptr<PendingMessage>> queue_;
124
125 int next_incoming_;
Sergey Ulanov 2016/10/19 21:23:38 = kAny
kelvinp 2016/10/21 00:26:02 Done.
126 int next_outgoing_;
Sergey Ulanov 2016/10/19 21:23:38 = 0
kelvinp 2016/10/21 00:26:02 Done.
127
128 // This prefix is necessary to disambiguate between the ID's sent from the
129 // client and the ID's sent from the host.
130 std::string outgoing_prefix_;
131
132 DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue);
133 };
134
135 std::list<std::unique_ptr<JingleSession::PendingMessage>>
136 JingleSession::OrderedMessageQueue::OnIncomingMessage(
137 std::string id,
138 std::unique_ptr<JingleSession::PendingMessage> message) {
139 std::list<std::unique_ptr<JingleSession::PendingMessage>> result;
140 int current = GetSequentialId(id);
141 // If there is no sequencing order encoded in the id, just return the
142 // message.
143 if (current == kInvalid) {
144 result.push_back(std::move(message));
145 return result;
146 }
147
148 if (next_incoming_ == kAny) {
149 next_incoming_ = current;
150 }
151
152 DCHECK(current >= next_incoming_)
153 << "Duplicate sequence id: current= " << current
Sergey Ulanov 2016/10/19 21:23:38 Messages in DCHECK() are very rarely useful, but t
kelvinp 2016/10/21 00:26:02 Done.
154 << " expected= " << next_incoming_;
155 DCHECK(queue_.find(current) == queue_.end())
156 << "Duplicate sequence id: current = " << current;
157
158 queue_.insert(std::pair<int, std::unique_ptr<JingleSession::PendingMessage>>(
Sergey Ulanov 2016/10/19 21:23:38 std::make_pair?
kelvinp 2016/10/21 00:26:03 Done.
159 current, std::move(message)));
160
161 auto it = queue_.begin();
162 while (it != queue_.end() && it->first == next_incoming_) {
163 result.push_back(std::move(it->second));
164 it = queue_.erase(it);
165 next_incoming_++;
166 }
167
168 if (current - next_incoming_ >= 3) {
169 LOG(WARNING) << "Multiple messages are missing: expected= "
170 << next_incoming_ << " current= " << current;
171 }
172 return result;
173 };
174
175 std::string JingleSession::OrderedMessageQueue::GetNextOutgoingId() {
Sergey Ulanov 2016/10/19 21:23:38 This doesn't look related to queuing incoming mess
kelvinp 2016/10/21 00:26:02 Done.
176 return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_);
177 }
178
68 JingleSession::JingleSession(JingleSessionManager* session_manager) 179 JingleSession::JingleSession(JingleSessionManager* session_manager)
69 : session_manager_(session_manager), 180 : session_manager_(session_manager),
70 event_handler_(nullptr), 181 event_handler_(nullptr),
71 state_(INITIALIZING), 182 state_(INITIALIZING),
72 error_(OK), 183 error_(OK),
73 weak_factory_(this) { 184 message_queue_(new OrderedMessageQueue),
74 } 185 weak_factory_(this) {}
75 186
76 JingleSession::~JingleSession() { 187 JingleSession::~JingleSession() {
77 session_manager_->SessionDestroyed(this); 188 session_manager_->SessionDestroyed(this);
78 } 189 }
79 190
80 void JingleSession::SetEventHandler(Session::EventHandler* event_handler) { 191 void JingleSession::SetEventHandler(Session::EventHandler* event_handler) {
81 DCHECK(thread_checker_.CalledOnValidThread()); 192 DCHECK(thread_checker_.CalledOnValidThread());
82 DCHECK(event_handler); 193 DCHECK(event_handler);
83 event_handler_ = event_handler; 194 event_handler_ = event_handler;
84 } 195 }
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after
213 324
214 void JingleSession::SendTransportInfo( 325 void JingleSession::SendTransportInfo(
215 std::unique_ptr<buzz::XmlElement> transport_info) { 326 std::unique_ptr<buzz::XmlElement> transport_info) {
216 DCHECK(thread_checker_.CalledOnValidThread()); 327 DCHECK(thread_checker_.CalledOnValidThread());
217 DCHECK_EQ(state_, AUTHENTICATED); 328 DCHECK_EQ(state_, AUTHENTICATED);
218 329
219 std::unique_ptr<JingleMessage> message(new JingleMessage( 330 std::unique_ptr<JingleMessage> message(new JingleMessage(
220 peer_address_, JingleMessage::TRANSPORT_INFO, session_id_)); 331 peer_address_, JingleMessage::TRANSPORT_INFO, session_id_));
221 message->transport_info = std::move(transport_info); 332 message->transport_info = std::move(transport_info);
222 333
334 std::unique_ptr<buzz::XmlElement> stanza = message->ToXml();
335 stanza->AddAttr(buzz::QN_ID, message_queue_->GetNextOutgoingId());
336
223 auto request = session_manager_->iq_sender()->SendIq( 337 auto request = session_manager_->iq_sender()->SendIq(
224 message->ToXml(), base::Bind(&JingleSession::OnTransportInfoResponse, 338 std::move(stanza), base::Bind(&JingleSession::OnTransportInfoResponse,
225 base::Unretained(this))); 339 base::Unretained(this)));
226 if (request) { 340 if (request) {
227 request->SetTimeout(base::TimeDelta::FromSeconds(kTransportInfoTimeout)); 341 request->SetTimeout(base::TimeDelta::FromSeconds(kTransportInfoTimeout));
228 transport_info_requests_.push_back(std::move(request)); 342 transport_info_requests_.push_back(std::move(request));
229 } else { 343 } else {
230 LOG(ERROR) << "Failed to send a transport-info message"; 344 LOG(ERROR) << "Failed to send a transport-info message";
231 } 345 }
232 } 346 }
233 347
234 void JingleSession::Close(protocol::ErrorCode error) { 348 void JingleSession::Close(protocol::ErrorCode error) {
235 DCHECK(thread_checker_.CalledOnValidThread()); 349 DCHECK(thread_checker_.CalledOnValidThread());
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 SetState(FAILED); 390 SetState(FAILED);
277 } else { 391 } else {
278 SetState(CLOSED); 392 SetState(CLOSED);
279 } 393 }
280 } 394 }
281 } 395 }
282 396
283 void JingleSession::SendMessage(std::unique_ptr<JingleMessage> message) { 397 void JingleSession::SendMessage(std::unique_ptr<JingleMessage> message) {
284 DCHECK(thread_checker_.CalledOnValidThread()); 398 DCHECK(thread_checker_.CalledOnValidThread());
285 399
400 std::unique_ptr<buzz::XmlElement> stanza = message->ToXml();
401 stanza->AddAttr(buzz::QN_ID, message_queue_->GetNextOutgoingId());
402
286 auto request = session_manager_->iq_sender()->SendIq( 403 auto request = session_manager_->iq_sender()->SendIq(
287 message->ToXml(), base::Bind(&JingleSession::OnMessageResponse, 404 std::move(stanza), base::Bind(&JingleSession::OnMessageResponse,
288 base::Unretained(this), message->action)); 405 base::Unretained(this), message->action));
289 406
290 int timeout = kDefaultMessageTimeout; 407 int timeout = kDefaultMessageTimeout;
291 if (message->action == JingleMessage::SESSION_INITIATE || 408 if (message->action == JingleMessage::SESSION_INITIATE ||
292 message->action == JingleMessage::SESSION_ACCEPT) { 409 message->action == JingleMessage::SESSION_ACCEPT) {
293 timeout = kSessionInitiateAndAcceptTimeout; 410 timeout = kSessionInitiateAndAcceptTimeout;
294 } 411 }
295 if (request) { 412 if (request) {
296 request->SetTimeout(base::TimeDelta::FromSeconds(timeout)); 413 request->SetTimeout(base::TimeDelta::FromSeconds(timeout));
297 pending_requests_.insert(std::move(request)); 414 pending_requests_.insert(std::move(request));
298 } else { 415 } else {
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
362 } 479 }
363 480
364 const std::string& type = response->Attr(buzz::QName(std::string(), "type")); 481 const std::string& type = response->Attr(buzz::QName(std::string(), "type"));
365 if (type != "result") { 482 if (type != "result") {
366 LOG(ERROR) << "Received error in response to transport-info message: \"" 483 LOG(ERROR) << "Received error in response to transport-info message: \""
367 << response->Str() << "\". Terminating the session."; 484 << response->Str() << "\". Terminating the session.";
368 Close(PEER_IS_OFFLINE); 485 Close(PEER_IS_OFFLINE);
369 } 486 }
370 } 487 }
371 488
372 void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message, 489 void JingleSession::OnIncomingMessage(std::string id,
490 std::unique_ptr<JingleMessage> message,
373 const ReplyCallback& reply_callback) { 491 const ReplyCallback& reply_callback) {
492 std::unique_ptr<PendingMessage> item(
493 new PendingMessage(std::move(message), reply_callback));
494 std::list<std::unique_ptr<PendingMessage>> ordered =
495 message_queue_->OnIncomingMessage(id, std::move(item));
496 for (auto it = ordered.begin(); it != ordered.end(); it++) {
Sergey Ulanov 2016/10/19 21:23:38 for (auto& message : ordered) Also for iterators
kelvinp 2016/10/21 00:26:02 Done.
497 OnIncomingMessageInOrder(std::move(it->get()->message),
498 it->get()->reply_callback);
499 }
500 }
501
502 void JingleSession::OnIncomingMessageInOrder(
503 std::unique_ptr<JingleMessage> message,
504 const ReplyCallback& reply_callback) {
374 DCHECK(thread_checker_.CalledOnValidThread()); 505 DCHECK(thread_checker_.CalledOnValidThread());
375 506
376 if (peer_address_ != message->from) { 507 if (peer_address_ != message->from) {
377 // Ignore messages received from a different Jid. 508 // Ignore messages received from a different Jid.
378 reply_callback.Run(JingleMessageReply::INVALID_SID); 509 reply_callback.Run(JingleMessageReply::INVALID_SID);
379 return; 510 return;
380 } 511 }
381 512
382 switch (message->action) { 513 switch (message->action) {
383 case JingleMessage::SESSION_ACCEPT: 514 case JingleMessage::SESSION_ACCEPT:
384 OnAccept(std::move(message), reply_callback); 515 OnAccept(std::move(message), reply_callback);
385 break; 516 break;
386 517
387 case JingleMessage::SESSION_INFO: 518 case JingleMessage::SESSION_INFO:
388 OnSessionInfo(std::move(message), reply_callback); 519 OnSessionInfo(std::move(message), reply_callback);
389 break; 520 break;
390 521
391 case JingleMessage::TRANSPORT_INFO: 522 case JingleMessage::TRANSPORT_INFO:
392 if (!transport_) { 523 if (!transport_) {
393 LOG(ERROR) << "Received unexpected transport-info message->"; 524 LOG(ERROR) << "Received unexpected transport-info message->";
394 reply_callback.Run(JingleMessageReply::NONE); 525 reply_callback.Run(JingleMessageReply::NONE);
395 return; 526 return;
396 } 527 }
397 528
398 if (!message->transport_info || 529 if (!message->transport_info ||
399 !transport_->ProcessTransportInfo( 530 !transport_->ProcessTransportInfo(message->transport_info.get())) {
400 message->transport_info.get())) {
401 reply_callback.Run(JingleMessageReply::BAD_REQUEST); 531 reply_callback.Run(JingleMessageReply::BAD_REQUEST);
402 return; 532 return;
403 } 533 }
404 534
405 reply_callback.Run(JingleMessageReply::NONE); 535 reply_callback.Run(JingleMessageReply::NONE);
406 break; 536 break;
407 537
408 case JingleMessage::SESSION_TERMINATE: 538 case JingleMessage::SESSION_TERMINATE:
409 OnTerminate(std::move(message), reply_callback); 539 OnTerminate(std::move(message), reply_callback);
410 break; 540 break;
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after
602 if (event_handler_) 732 if (event_handler_)
603 event_handler_->OnSessionStateChange(new_state); 733 event_handler_->OnSessionStateChange(new_state);
604 } 734 }
605 } 735 }
606 736
607 bool JingleSession::is_session_active() { 737 bool JingleSession::is_session_active() {
608 return state_ == CONNECTING || state_ == ACCEPTING || state_ == ACCEPTED || 738 return state_ == CONNECTING || state_ == ACCEPTING || state_ == ACCEPTED ||
609 state_ == AUTHENTICATING || state_ == AUTHENTICATED; 739 state_ == AUTHENTICATING || state_ == AUTHENTICATED;
610 } 740 }
611 741
742 JingleSession::PendingMessage::PendingMessage(
Sergey Ulanov 2016/10/19 21:23:38 nit: put this above JingleSession::JingleSession()
kelvinp 2016/10/21 00:26:02 Done.
743 std::unique_ptr<JingleMessage> message,
744 const ReplyCallback& reply_callback)
745 : message(std::move(message)), reply_callback(reply_callback) {}
746
747 JingleSession::PendingMessage::~PendingMessage() {}
748
612 } // namespace protocol 749 } // namespace protocol
613 } // namespace remoting 750 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698