| Index: remoting/protocol/jingle_session.cc
|
| diff --git a/remoting/protocol/jingle_session.cc b/remoting/protocol/jingle_session.cc
|
| index 49b1f13deeaa98a3ed24c3ae1e95c1972fdc313d..6e6457abe14fc14294f4d79c3085ea4a04b46cb9 100644
|
| --- a/remoting/protocol/jingle_session.cc
|
| +++ b/remoting/protocol/jingle_session.cc
|
| @@ -10,10 +10,9 @@
|
| #include <utility>
|
|
|
| #include "base/bind.h"
|
| -#include "base/rand_util.h"
|
| #include "base/single_thread_task_runner.h"
|
| #include "base/stl_util.h"
|
| -#include "base/strings/string_number_conversions.h"
|
| +#include "base/strings/string_split.h"
|
| #include "base/threading/thread_task_runner_handle.h"
|
| #include "base/time/time.h"
|
| #include "remoting/base/constants.h"
|
| @@ -25,6 +24,7 @@
|
| #include "remoting/protocol/transport.h"
|
| #include "remoting/signaling/iq_sender.h"
|
| #include "third_party/webrtc/libjingle/xmllite/xmlelement.h"
|
| +#include "third_party/webrtc/libjingle/xmpp/constants.h"
|
| #include "third_party/webrtc/p2p/base/candidate.h"
|
|
|
| using buzz::XmlElement;
|
| @@ -47,6 +47,13 @@ const int kSessionInitiateAndAcceptTimeout = kDefaultMessageTimeout * 3;
|
| // Timeout for the transport-info messages.
|
| const int kTransportInfoTimeout = 10 * 60;
|
|
|
| +// Special value for an invalid sequential ID for an incoming IQ.
|
| +const int kInvalid = -1;
|
| +
|
| +// Special value indicating that any sequential ID is valid for the next
|
| +// incoming IQ.
|
| +const int kAny = -1;
|
| +
|
| ErrorCode AuthRejectionReasonToErrorCode(
|
| Authenticator::RejectionReason reason) {
|
| switch (reason) {
|
| @@ -63,15 +70,107 @@ ErrorCode AuthRejectionReasonToErrorCode(
|
| return UNKNOWN_ERROR;
|
| }
|
|
|
| +// Extracts a sequential id from the id attribute of the IQ stanza.
|
| +int GetSequentialId(const std::string& id) {
|
| + std::vector<std::string> tokens =
|
| + SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY);
|
| + // Legacy endpoints does not encode the IQ ordering in the ID attribute
|
| + if (tokens.size() != 2) {
|
| + return kInvalid;
|
| + }
|
| +
|
| + int result = kInvalid;
|
| + if (!base::StringToInt(tokens[1].c_str(), &result)) {
|
| + return kInvalid;
|
| + }
|
| + return result;
|
| +}
|
| +
|
| } // namespace
|
|
|
| +// A Queue that sorts incoming messages and returns them in the ascending order
|
| +// of sequence ids. The sequence id can be extracted from the ID attribute of
|
| +// an IQ stanza, which have the following format <opaque_string>_<sequence_id>.
|
| +//
|
| +// Background:
|
| +// The chromoting signaling channel does not guarantee that the incoming IQs are
|
| +// delivered in the order that it is sent.
|
| +//
|
| +// This behavior leads to transient session setup failures. For instance,
|
| +// a <transport-info> that is sent after a <session-info> message is sometimes
|
| +// delivered to the client out of order, causing the client to close the
|
| +// session due to an unexpected request.
|
| +class JingleSession::OrderedMessageQueue {
|
| + public:
|
| + OrderedMessageQueue() {}
|
| + ~OrderedMessageQueue() {}
|
| +
|
| + // Returns the list of messages ordered by their sequential IDs.
|
| + std::vector<std::unique_ptr<PendingMessage>> OnIncomingMessage(
|
| + const std::string& id,
|
| + std::unique_ptr<PendingMessage>);
|
| +
|
| + private:
|
| + // Implements an ordered list by using map with the |sequence_id| as the key,
|
| + // so that |queue_| is always sorted by |sequence_id|.
|
| + std::map<int, std::unique_ptr<PendingMessage>> queue_;
|
| +
|
| + int next_incoming_ = kAny;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue);
|
| +};
|
| +
|
| +std::vector<std::unique_ptr<JingleSession::PendingMessage>>
|
| +JingleSession::OrderedMessageQueue::OnIncomingMessage(
|
| + const std::string& id,
|
| + std::unique_ptr<JingleSession::PendingMessage> message) {
|
| + std::vector<std::unique_ptr<JingleSession::PendingMessage>> result;
|
| + int current = GetSequentialId(id);
|
| + // If there is no sequencing order encoded in the id, just return the
|
| + // message.
|
| + if (current == kInvalid) {
|
| + result.push_back(std::move(message));
|
| + return result;
|
| + }
|
| +
|
| + if (next_incoming_ == kAny) {
|
| + next_incoming_ = current;
|
| + }
|
| +
|
| + // Ensure there are no duplicate sequence ids.
|
| + DCHECK_GE(current, next_incoming_);
|
| + DCHECK(queue_.find(current) == queue_.end());
|
| +
|
| + queue_.insert(std::make_pair(current, std::move(message)));
|
| +
|
| + auto it = queue_.begin();
|
| + while (it != queue_.end() && it->first == next_incoming_) {
|
| + result.push_back(std::move(it->second));
|
| + it = queue_.erase(it);
|
| + next_incoming_++;
|
| + }
|
| +
|
| + if (current - next_incoming_ >= 3) {
|
| + LOG(WARNING) << "Multiple messages are missing: expected= "
|
| + << next_incoming_ << " current= " << current;
|
| + }
|
| + return result;
|
| +};
|
| +
|
| +JingleSession::PendingMessage::PendingMessage(
|
| + std::unique_ptr<JingleMessage> message,
|
| + const ReplyCallback& reply_callback)
|
| + : message(std::move(message)), reply_callback(reply_callback) {}
|
| +
|
| +JingleSession::PendingMessage::~PendingMessage() {}
|
| +
|
| JingleSession::JingleSession(JingleSessionManager* session_manager)
|
| : session_manager_(session_manager),
|
| event_handler_(nullptr),
|
| state_(INITIALIZING),
|
| error_(OK),
|
| - weak_factory_(this) {
|
| -}
|
| + message_queue_(new OrderedMessageQueue),
|
| + weak_factory_(this) {}
|
|
|
| JingleSession::~JingleSession() {
|
| session_manager_->SessionDestroyed(this);
|
| @@ -220,9 +319,12 @@ void JingleSession::SendTransportInfo(
|
| peer_address_, JingleMessage::TRANSPORT_INFO, session_id_));
|
| message->transport_info = std::move(transport_info);
|
|
|
| + std::unique_ptr<buzz::XmlElement> stanza = message->ToXml();
|
| + stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId());
|
| +
|
| auto request = session_manager_->iq_sender()->SendIq(
|
| - message->ToXml(), base::Bind(&JingleSession::OnTransportInfoResponse,
|
| - base::Unretained(this)));
|
| + std::move(stanza), base::Bind(&JingleSession::OnTransportInfoResponse,
|
| + base::Unretained(this)));
|
| if (request) {
|
| request->SetTimeout(base::TimeDelta::FromSeconds(kTransportInfoTimeout));
|
| transport_info_requests_.push_back(std::move(request));
|
| @@ -283,9 +385,12 @@ void JingleSession::Close(protocol::ErrorCode error) {
|
| void JingleSession::SendMessage(std::unique_ptr<JingleMessage> message) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| + std::unique_ptr<buzz::XmlElement> stanza = message->ToXml();
|
| + stanza->AddAttr(buzz::QN_ID, GetNextOutgoingId());
|
| +
|
| auto request = session_manager_->iq_sender()->SendIq(
|
| - message->ToXml(), base::Bind(&JingleSession::OnMessageResponse,
|
| - base::Unretained(this), message->action));
|
| + std::move(stanza), base::Bind(&JingleSession::OnMessageResponse,
|
| + base::Unretained(this), message->action));
|
|
|
| int timeout = kDefaultMessageTimeout;
|
| if (message->action == JingleMessage::SESSION_INITIATE ||
|
| @@ -369,8 +474,22 @@ void JingleSession::OnTransportInfoResponse(IqRequest* request,
|
| }
|
| }
|
|
|
| -void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message,
|
| +void JingleSession::OnIncomingMessage(const std::string& id,
|
| + std::unique_ptr<JingleMessage> message,
|
| const ReplyCallback& reply_callback) {
|
| + std::unique_ptr<PendingMessage> item(
|
| + new PendingMessage(std::move(message), reply_callback));
|
| + std::vector<std::unique_ptr<PendingMessage>> ordered =
|
| + message_queue_->OnIncomingMessage(id, std::move(item));
|
| + for (auto& message : ordered) {
|
| + ProcessIncomingMessage(std::move(message->message),
|
| + message->reply_callback);
|
| + }
|
| +}
|
| +
|
| +void JingleSession::ProcessIncomingMessage(
|
| + std::unique_ptr<JingleMessage> message,
|
| + const ReplyCallback& reply_callback) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| if (peer_address_ != message->from) {
|
| @@ -396,8 +515,7 @@ void JingleSession::OnIncomingMessage(std::unique_ptr<JingleMessage> message,
|
| }
|
|
|
| if (!message->transport_info ||
|
| - !transport_->ProcessTransportInfo(
|
| - message->transport_info.get())) {
|
| + !transport_->ProcessTransportInfo(message->transport_info.get())) {
|
| reply_callback.Run(JingleMessageReply::BAD_REQUEST);
|
| return;
|
| }
|
| @@ -609,5 +727,9 @@ bool JingleSession::is_session_active() {
|
| state_ == AUTHENTICATING || state_ == AUTHENTICATED;
|
| }
|
|
|
| +std::string JingleSession::GetNextOutgoingId() {
|
| + return outgoing_id_prefix_ + "_" + base::IntToString(++next_outgoing_id_);
|
| +}
|
| +
|
| } // namespace protocol
|
| } // namespace remoting
|
|
|