Chromium Code Reviews| Index: remoting/protocol/jingle_session.cc |
| diff --git a/remoting/protocol/jingle_session.cc b/remoting/protocol/jingle_session.cc |
| index 49b1f13deeaa98a3ed24c3ae1e95c1972fdc313d..82f3096f497f4d51d9465306dfbf725346849bbf 100644 |
| --- a/remoting/protocol/jingle_session.cc |
| +++ b/remoting/protocol/jingle_session.cc |
| @@ -10,10 +10,9 @@ |
| #include <utility> |
| #include "base/bind.h" |
| -#include "base/rand_util.h" |
| #include "base/single_thread_task_runner.h" |
| #include "base/stl_util.h" |
| -#include "base/strings/string_number_conversions.h" |
| +#include "base/strings/string_split.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "base/time/time.h" |
| #include "remoting/base/constants.h" |
| @@ -25,6 +24,7 @@ |
| #include "remoting/protocol/transport.h" |
| #include "remoting/signaling/iq_sender.h" |
| #include "third_party/webrtc/libjingle/xmllite/xmlelement.h" |
| +#include "third_party/webrtc/libjingle/xmpp/constants.h" |
| #include "third_party/webrtc/p2p/base/candidate.h" |
| using buzz::XmlElement; |
| @@ -47,6 +47,13 @@ const int kSessionInitiateAndAcceptTimeout = kDefaultMessageTimeout * 3; |
| // Timeout for the transport-info messages. |
| const int kTransportInfoTimeout = 10 * 60; |
| +// Special value for an invalid sequential ID for an incoming IQ. |
| +const int kInvalid = -1; |
| + |
| +// Special value indicating that any sequential ID is valid for the next |
| +// incoming IQ. |
| +const int kAny = -1; |
| + |
| ErrorCode AuthRejectionReasonToErrorCode( |
| Authenticator::RejectionReason reason) { |
| switch (reason) { |
| @@ -63,15 +70,107 @@ ErrorCode AuthRejectionReasonToErrorCode( |
| return UNKNOWN_ERROR; |
| } |
| +// Extracts a sequential id from the id attribute of the IQ stanza. |
| +int GetSequentialId(const std::string& id) { |
| + std::vector<std::string> tokens = |
| + SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY); |
| + // Legacy endpoints does not encode the IQ ordering in the ID attribute |
| + if (tokens.size() != 2) { |
| + return kInvalid; |
| + } |
| + |
| + int result = kInvalid; |
| + if (!base::StringToInt(tokens[1].c_str(), &result)) { |
| + return kInvalid; |
| + } |
| + return result; |
| +} |
| + |
| } // namespace |
| +// A Queue that sorts incoming messages and returns them in the ascending order |
| +// of sequence ids. The sequence id can be extracted from the ID attribute of |
| +// an IQ stanza, which have the following format <opaque_string>_<sequence_id>. |
| +// |
| +// Background: |
| +// The chromoting signaling channel does not guarantee that the incoming IQs are |
| +// delivered in the order that it is sent. |
| +// |
| +// This behavior leads to transient session setup failures. For instance, |
| +// a <transport-info> that is sent after a <session-info> message is sometimes |
| +// delivered to the client out of order, causing the client to close the |
| +// session due to an unexpected request. |
| +class JingleSession::OrderedMessageQueue { |
| + public: |
| + OrderedMessageQueue() {} |
| + ~OrderedMessageQueue() {} |
| + |
| + // Returns the list of messages ordered by their sequential IDs. |
| + std::vector<std::unique_ptr<PendingMessage>> OnIncomingMessage( |
| + const std::string& id, |
| + std::unique_ptr<PendingMessage>); |
| + |
| + private: |
| + // Implements an ordered list by using map with the |sequence_id| as the key, |
| + // so that |queue_| is always sorted by |sequence_id|. |
| + std::map<int, std::unique_ptr<PendingMessage>> queue_; |
| + |
| + int next_incoming_ = kAny; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue); |
| +}; |
| + |
| +std::vector<std::unique_ptr<JingleSession::PendingMessage>> |
| +JingleSession::OrderedMessageQueue::OnIncomingMessage( |
| + const std::string& id, |
| + std::unique_ptr<JingleSession::PendingMessage> message) { |
| + std::vector<std::unique_ptr<JingleSession::PendingMessage>> result; |
| + int current = GetSequentialId(id); |
| + // If there is no sequencing order encoded in the id, just return the |
| + // message. |
| + if (current == kInvalid) { |
| + result.push_back(std::move(message)); |
| + return result; |
| + } |
| + |
| + if (next_incoming_ == kAny) { |
| + next_incoming_ = current; |
| + } |
| + |
| + // Ensure there are no duplicate sequence ids. |
| + DCHECK(current >= next_incoming_); |
|
Sergey Ulanov
2016/10/21 19:19:39
DCHECK_GE
kelvinp
2016/10/21 22:41:27
Done.
|
| + DCHECK(queue_.find(current) == queue_.end()); |
| + |
| + queue_.insert(std::make_pair(current, std::move(message))); |
| + |
| + auto it = queue_.begin(); |
| + while (it != queue_.end() && it->first == next_incoming_) { |
| + result.push_back(std::move(it->second)); |
| + it = queue_.erase(it); |
| + next_incoming_++; |
| + } |
| + |
| + if (current - next_incoming_ >= 3) { |
| + LOG(WARNING) << "Multiple messages are missing: expected= " |
| + << next_incoming_ << " current= " << current; |
| + } |
| + return result; |
| +}; |
| + |
| +JingleSession::PendingMessage::PendingMessage( |
| + std::unique_ptr<JingleMessage> message, |
| + const ReplyCallback& reply_callback) |
| + : message(std::move(message)), reply_callback(reply_callback) {} |
| + |
| +JingleSession::PendingMessage::~PendingMessage() {} |
| + |
| JingleSession::JingleSession(JingleSessionManager* session_manager) |
| : session_manager_(session_manager), |
| event_handler_(nullptr), |
| state_(INITIALIZING), |
| error_(OK), |
| - weak_factory_(this) { |
| -} |
| + message_queue_(new OrderedMessageQueue), |
| + weak_factory_(this) {} |
| JingleSession::~JingleSession() { |
| session_manager_->SessionDestroyed(this); |
| @@ -220,9 +319,12 @@ void JingleSession::SendTransportInfo( |
| peer_address_, JingleMessage::TRANSPORT_INFO, session_id_)); |
| message->transport_info = std::move(transport_info); |
| + std::unique_ptr<buzz::XmlElement> stanza = message->ToXml(); |
| + stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId()); |
| + |
| auto request = session_manager_->iq_sender()->SendIq( |
| - message->ToXml(), base::Bind(&JingleSession::OnTransportInfoResponse, |
| - base::Unretained(this))); |
| + std::move(stanza), base::Bind(&JingleSession::OnTransportInfoResponse, |
| + base::Unretained(this))); |
| if (request) { |
| request->SetTimeout(base::TimeDelta::FromSeconds(kTransportInfoTimeout)); |
| transport_info_requests_.push_back(std::move(request)); |
| @@ -283,9 +385,12 @@ void JingleSession::Close(protocol::ErrorCode error) { |
| void JingleSession::SendMessage(std::unique_ptr<JingleMessage> message) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| + std::unique_ptr<buzz::XmlElement> stanza = message->ToXml(); |
| + stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId()); |
| + |
| auto request = session_manager_->iq_sender()->SendIq( |
| - message->ToXml(), base::Bind(&JingleSession::OnMessageResponse, |
| - base::Unretained(this), message->action)); |
| + std::move(stanza), base::Bind(&JingleSession::OnMessageResponse, |
| + base::Unretained(this), message->action)); |
| int timeout = kDefaultMessageTimeout; |
| if (message->action == JingleMessage::SESSION_INITIATE || |
| @@ -369,8 +474,22 @@ void JingleSession::OnTransportInfoResponse(IqRequest* request, |
| } |
| } |
| -void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message, |
| +void JingleSession::OnIncomingMessage(const std::string& id, |
| + std::unique_ptr<JingleMessage> message, |
| const ReplyCallback& reply_callback) { |
| + std::unique_ptr<PendingMessage> item( |
| + new PendingMessage(std::move(message), reply_callback)); |
| + std::vector<std::unique_ptr<PendingMessage>> ordered = |
| + message_queue_->OnIncomingMessage(id, std::move(item)); |
| + for (auto& message : ordered) { |
| + ProcessIncomingMessage(std::move(message->message), |
| + message->reply_callback); |
| + } |
| +} |
| + |
| +void JingleSession::ProcessIncomingMessage( |
| + std::unique_ptr<JingleMessage> message, |
| + const ReplyCallback& reply_callback) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| if (peer_address_ != message->from) { |
| @@ -396,8 +515,7 @@ void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message, |
| } |
| if (!message->transport_info || |
| - !transport_->ProcessTransportInfo( |
| - message->transport_info.get())) { |
| + !transport_->ProcessTransportInfo(message->transport_info.get())) { |
| reply_callback.Run(JingleMessageReply::BAD_REQUEST); |
| return; |
| } |
| @@ -609,5 +727,9 @@ bool JingleSession::is_session_active() { |
| state_ == AUTHENTICATING || state_ == AUTHENTICATED; |
| } |
| +std::string JingleSession::GetNextOutgoingId() { |
| + return outgoing_id_prefix_ + "_" + base::IntToString(++next_outgoing_id_); |
| +} |
| + |
| } // namespace protocol |
| } // namespace remoting |