Chromium Code Reviews| Index: remoting/protocol/jingle_session.cc |
| diff --git a/remoting/protocol/jingle_session.cc b/remoting/protocol/jingle_session.cc |
| index 49b1f13deeaa98a3ed24c3ae1e95c1972fdc313d..2c283e1971c849fc1cb5aa8b3207c5a9d9180e0d 100644 |
| --- a/remoting/protocol/jingle_session.cc |
| +++ b/remoting/protocol/jingle_session.cc |
| @@ -25,6 +25,7 @@ |
| #include "remoting/protocol/transport.h" |
| #include "remoting/signaling/iq_sender.h" |
| #include "third_party/webrtc/libjingle/xmllite/xmlelement.h" |
| +#include "third_party/webrtc/libjingle/xmpp/constants.h" |
| #include "third_party/webrtc/p2p/base/candidate.h" |
| using buzz::XmlElement; |
| @@ -47,6 +48,13 @@ const int kSessionInitiateAndAcceptTimeout = kDefaultMessageTimeout * 3; |
| // Timeout for the transport-info messages. |
| const int kTransportInfoTimeout = 10 * 60; |
| +// Special value for an invalid sequential ID for an incoming IQ. |
| +const int kInvalid = -1; |
| + |
| +// Special value indicating that any sequential ID is valid for the next |
| +// incoming IQ. |
| +const int kAny = -1; |
| + |
| ErrorCode AuthRejectionReasonToErrorCode( |
| Authenticator::RejectionReason reason) { |
| switch (reason) { |
| @@ -63,15 +71,118 @@ ErrorCode AuthRejectionReasonToErrorCode( |
| return UNKNOWN_ERROR; |
| } |
| +// Extracts a sequential id from the id attribute of the IQ stanza. |
| +int GetSequentialId(std::string id) { |
|
Sergey Ulanov
2016/10/19 21:23:38
please use const reference for string parameters
kelvinp
2016/10/21 00:26:02
Done.
|
| + int result = kInvalid; |
|
Sergey Ulanov
2016/10/19 21:23:38
move this below, just before StringToInt() call. S
kelvinp
2016/10/21 00:26:02
Done.
|
| + std::vector<std::string> tokens = |
| + SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY); |
| + // Legacy endpoints does not encode the IQ ordering in the ID attribute |
| + if (tokens.size() != 2) { |
| + return kInvalid; |
| + } |
| + |
| + if (!base::StringToInt(tokens[1].c_str(), &result)) { |
| + return kInvalid; |
| + } |
| + return result; |
| +}; |
|
Sergey Ulanov
2016/10/19 21:23:38
nit: don't need ;
kelvinp
2016/10/21 00:26:02
Done.
|
| + |
| } // namespace |
| +// A Queue that sorts incoming messages and returns them in the ascending order |
| +// of sequence ids. The sequence id can be extracted from the ID attribute of |
| +// an IQ stanza, which have the following format <opaque_string>_<sequence_id>. |
| +// |
| +// Background: |
| +// The chromoting signaling channel does not guarantee that the incoming IQs are |
| +// delivered in the order that it is sent. |
| +// |
| +// This behavior leads to transient session setup failures. For instance, |
| +// a <transport-info> that is sent after a <session-info> message is sometimes |
| +// delivered to the client out of order, causing the client to close the |
| +// session due to an unexpected request. |
| +class JingleSession::OrderedMessageQueue { |
| + public: |
| + OrderedMessageQueue() |
| + : next_incoming_(kAny), |
| + next_outgoing_(0), |
|
Sergey Ulanov
2016/10/19 21:23:38
Move these initializers to where they are defined.
kelvinp
2016/10/21 00:26:03
Done.
|
| + outgoing_prefix_(base::Uint64ToString( |
| + base::RandGenerator(std::numeric_limits<uint64_t>::max()))){}; |
|
Sergey Ulanov
2016/10/19 21:23:38
base::RandUint64()
Sergey Ulanov
2016/10/19 21:23:38
don't need semicolon
kelvinp
2016/10/21 00:26:02
Done.
kelvinp
2016/10/21 00:26:03
Done.
|
| + |
| + ~OrderedMessageQueue() {} |
| + |
| + // Returns the list of messages ordered by their sequential IDs. |
| + std::list<std::unique_ptr<PendingMessage>> OnIncomingMessage( |
|
Sergey Ulanov
2016/10/19 21:23:38
Better to use std::vector<> instead of std::list<>
kelvinp
2016/10/21 00:26:03
Done.
|
| + std::string id, |
|
Sergey Ulanov
2016/10/19 21:23:38
const reference
kelvinp
2016/10/21 00:26:02
Done.
|
| + std::unique_ptr<PendingMessage>); |
| + std::string GetNextOutgoingId(); |
| + |
| + private: |
| + // Implements an ordered list by using map with the |sequence_id| as the key, |
|
Sergey Ulanov
2016/10/19 21:23:39
I don't think you really need a map. A heap stored
kelvinp
2016/10/21 00:26:02
Looking at std::push_heap() this would require us
|
| + // so that |queue_| is always sorted by |sequence_id|. |
| + std::map<int, std::unique_ptr<PendingMessage>> queue_; |
| + |
| + int next_incoming_; |
|
Sergey Ulanov
2016/10/19 21:23:38
= kAny
kelvinp
2016/10/21 00:26:02
Done.
|
| + int next_outgoing_; |
|
Sergey Ulanov
2016/10/19 21:23:38
= 0
kelvinp
2016/10/21 00:26:02
Done.
|
| + |
| + // This prefix is necessary to disambiguate between the ID's sent from the |
| + // client and the ID's sent from the host. |
| + std::string outgoing_prefix_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue); |
| +}; |
| + |
| +std::list<std::unique_ptr<JingleSession::PendingMessage>> |
| +JingleSession::OrderedMessageQueue::OnIncomingMessage( |
| + std::string id, |
| + std::unique_ptr<JingleSession::PendingMessage> message) { |
| + std::list<std::unique_ptr<JingleSession::PendingMessage>> result; |
| + int current = GetSequentialId(id); |
| + // If there is no sequencing order encoded in the id, just return the |
| + // message. |
| + if (current == kInvalid) { |
| + result.push_back(std::move(message)); |
| + return result; |
| + } |
| + |
| + if (next_incoming_ == kAny) { |
| + next_incoming_ = current; |
| + } |
| + |
| + DCHECK(current >= next_incoming_) |
| + << "Duplicate sequence id: current= " << current |
|
Sergey Ulanov
2016/10/19 21:23:38
Messages in DCHECK() are very rarely useful, but t
kelvinp
2016/10/21 00:26:02
Done.
|
| + << " expected= " << next_incoming_; |
| + DCHECK(queue_.find(current) == queue_.end()) |
| + << "Duplicate sequence id: current = " << current; |
| + |
| + queue_.insert(std::pair<int, std::unique_ptr<JingleSession::PendingMessage>>( |
|
Sergey Ulanov
2016/10/19 21:23:38
std::make_pair?
kelvinp
2016/10/21 00:26:03
Done.
|
| + current, std::move(message))); |
| + |
| + auto it = queue_.begin(); |
| + while (it != queue_.end() && it->first == next_incoming_) { |
| + result.push_back(std::move(it->second)); |
| + it = queue_.erase(it); |
| + next_incoming_++; |
| + } |
| + |
| + if (current - next_incoming_ >= 3) { |
| + LOG(WARNING) << "Multiple messages are missing: expected= " |
| + << next_incoming_ << " current= " << current; |
| + } |
| + return result; |
| +}; |
| + |
| +std::string JingleSession::OrderedMessageQueue::GetNextOutgoingId() { |
|
Sergey Ulanov
2016/10/19 21:23:38
This doesn't look related to queuing incoming mess
kelvinp
2016/10/21 00:26:02
Done.
|
| + return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_); |
| +} |
| + |
| JingleSession::JingleSession(JingleSessionManager* session_manager) |
| : session_manager_(session_manager), |
| event_handler_(nullptr), |
| state_(INITIALIZING), |
| error_(OK), |
| - weak_factory_(this) { |
| -} |
| + message_queue_(new OrderedMessageQueue), |
| + weak_factory_(this) {} |
| JingleSession::~JingleSession() { |
| session_manager_->SessionDestroyed(this); |
| @@ -220,9 +331,12 @@ void JingleSession::SendTransportInfo( |
| peer_address_, JingleMessage::TRANSPORT_INFO, session_id_)); |
| message->transport_info = std::move(transport_info); |
| + std::unique_ptr<buzz::XmlElement> stanza = message->ToXml(); |
| + stanza->AddAttr(buzz::QN_ID, message_queue_->GetNextOutgoingId()); |
| + |
| auto request = session_manager_->iq_sender()->SendIq( |
| - message->ToXml(), base::Bind(&JingleSession::OnTransportInfoResponse, |
| - base::Unretained(this))); |
| + std::move(stanza), base::Bind(&JingleSession::OnTransportInfoResponse, |
| + base::Unretained(this))); |
| if (request) { |
| request->SetTimeout(base::TimeDelta::FromSeconds(kTransportInfoTimeout)); |
| transport_info_requests_.push_back(std::move(request)); |
| @@ -283,9 +397,12 @@ void JingleSession::Close(protocol::ErrorCode error) { |
| void JingleSession::SendMessage(std::unique_ptr<JingleMessage> message) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| + std::unique_ptr<buzz::XmlElement> stanza = message->ToXml(); |
| + stanza->AddAttr(buzz::QN_ID, message_queue_->GetNextOutgoingId()); |
| + |
| auto request = session_manager_->iq_sender()->SendIq( |
| - message->ToXml(), base::Bind(&JingleSession::OnMessageResponse, |
| - base::Unretained(this), message->action)); |
| + std::move(stanza), base::Bind(&JingleSession::OnMessageResponse, |
| + base::Unretained(this), message->action)); |
| int timeout = kDefaultMessageTimeout; |
| if (message->action == JingleMessage::SESSION_INITIATE || |
| @@ -369,8 +486,22 @@ void JingleSession::OnTransportInfoResponse(IqRequest* request, |
| } |
| } |
| -void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message, |
| +void JingleSession::OnIncomingMessage(std::string id, |
| + std::unique_ptr<JingleMessage> message, |
| const ReplyCallback& reply_callback) { |
| + std::unique_ptr<PendingMessage> item( |
| + new PendingMessage(std::move(message), reply_callback)); |
| + std::list<std::unique_ptr<PendingMessage>> ordered = |
| + message_queue_->OnIncomingMessage(id, std::move(item)); |
| + for (auto it = ordered.begin(); it != ordered.end(); it++) { |
|
Sergey Ulanov
2016/10/19 21:23:38
for (auto& message : ordered)
Also for iterators
kelvinp
2016/10/21 00:26:02
Done.
|
| + OnIncomingMessageInOrder(std::move(it->get()->message), |
| + it->get()->reply_callback); |
| + } |
| +} |
| + |
| +void JingleSession::OnIncomingMessageInOrder( |
| + std::unique_ptr<JingleMessage> message, |
| + const ReplyCallback& reply_callback) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| if (peer_address_ != message->from) { |
| @@ -396,8 +527,7 @@ void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message, |
| } |
| if (!message->transport_info || |
| - !transport_->ProcessTransportInfo( |
| - message->transport_info.get())) { |
| + !transport_->ProcessTransportInfo(message->transport_info.get())) { |
| reply_callback.Run(JingleMessageReply::BAD_REQUEST); |
| return; |
| } |
| @@ -609,5 +739,12 @@ bool JingleSession::is_session_active() { |
| state_ == AUTHENTICATING || state_ == AUTHENTICATED; |
| } |
| +JingleSession::PendingMessage::PendingMessage( |
|
Sergey Ulanov
2016/10/19 21:23:38
nit: put this above JingleSession::JingleSession()
kelvinp
2016/10/21 00:26:02
Done.
|
| + std::unique_ptr<JingleMessage> message, |
| + const ReplyCallback& reply_callback) |
| + : message(std::move(message)), reply_callback(reply_callback) {} |
| + |
| +JingleSession::PendingMessage::~PendingMessage() {} |
| + |
| } // namespace protocol |
| } // namespace remoting |