Chromium Code Reviews| Index: remoting/protocol/ordered_message_queue.h |
| diff --git a/remoting/protocol/ordered_message_queue.h b/remoting/protocol/ordered_message_queue.h |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..df1cf1aee4920352bc86898436a9b98cedc2a021 |
| --- /dev/null |
| +++ b/remoting/protocol/ordered_message_queue.h |
| @@ -0,0 +1,132 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#ifndef REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ |
|
Sergey Ulanov
2016/10/19 21:23:39
remove this file?
kelvinp
2016/10/21 00:26:03
Done.
|
| +#define REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ |
| + |
| +#include <stdint.h> |
| + |
| +#include <limits> |
| +#include <list> |
| +#include <map> |
| +#include <memory> |
| +#include <string> |
| + |
| +#include "base/macros.h" |
| +#include "base/rand_util.h" |
| +#include "base/strings/string_number_conversions.h" |
| +#include "base/strings/string_split.h" |
| +#include "remoting/protocol/jingle_messages.h" |
| + |
| +namespace { |
| +const int kInvalid = -1; |
| +const int kAny = -1; |
| + |
| +// Extracts a sequential id from the id attribute of the IQ stanza. |
| +int GetSequentialId(std::string id) { |
| + int result = kInvalid; |
| + std::vector<std::string> tokens = |
| + SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY); |
| + // Legacy endpoints does not encode the IQ ordering in the ID attribute |
| + if (tokens.size() != 2) { |
| + return kInvalid; |
| + } |
| + |
| + if (!base::StringToInt(tokens[1].c_str(), &result)) { |
| + return kInvalid; |
| + } |
| + return result; |
| +}; |
| +} |
| + |
| +namespace remoting { |
| + |
| +// A Queue that sorts incoming messages and returns them in the ascending order |
| +// of |id|. |id| represents the value of the ID attribute of an IQ stanza, |
| +// which have the following format <opaque_string>_<sequence_id>. |
| +// |
| +// OrderedMessageQueue.OnIncomingMessage() will always return messages in |
| +// ascending order of <sequence_id>. |
| +// |
| +// Background: |
| +// The chromoting signaling channel does not guarantee that the incoming IQs are |
| +// delivered in the order that it is sent. |
| +// |
| +// This behavior leads to transient session setup failures. For instance, |
| +// a <transport-info> that is sent after a <session-info> message is sometimes |
| +// delivered to the client out of order, causing the client to close the |
| +// session due to an unexpected request. |
| +template <typename T> |
| +class OrderedMessageQueue { |
| + public: |
| + OrderedMessageQueue() |
| + : next_incoming_(kAny), |
| + next_outgoing_(0), |
| + outgoing_prefix_(base::Uint64ToString( |
| + base::RandGenerator(std::numeric_limits<uint64_t>::max()))){}; |
| + |
| + ~OrderedMessageQueue() {} |
| + |
| + // Returns the list of messages ordered by their sequential IDs. |
| + std::list<T> OnIncomingMessage(std::string id, T message) { |
| + std::list<T> result; |
| + int current = GetSequentialId(id); |
| + // If there is no sequencing order encoded in the id, just return the |
| + // message. |
| + if (current == kInvalid) { |
| + result.push_back(std::move(message)); |
| + return result; |
| + } |
| + |
| + if (next_incoming_ == kAny) { |
| + next_incoming_ = current; |
| + } |
| + |
| + DCHECK(current >= next_incoming_) |
| + << "Duplicate sequence id: current= " << current |
| + << " expected= " << next_incoming_; |
| + DCHECK(queue_.find(current) == queue_.end()) |
| + << "Duplicate sequence id: current = " << current; |
| + |
| + queue_.insert(std::pair<int, T>(current, std::move(message))); |
| + |
| + auto it = queue_.begin(); |
| + while (it != queue_.end() && it->first == next_incoming_) { |
| + result.push_back(std::move(it->second)); |
| + it = queue_.erase(it); |
| + next_incoming_++; |
| + } |
| + |
| + if (current - next_incoming_ >= 3) { |
| + LOG(WARNING) << "Multiple messages are missing: expected= " |
| + << next_incoming_ << " current= " << current; |
| + } |
| + return result; |
| + }; |
| + |
| + // Returns the value of the id attribute of the next outgoing XMPP IQ stanza. |
| + // The other endpoint uses this value to ensure the incoming IQ's are |
| + // processed in order. |
| + std::string GetNextOutgoingId() { |
| + return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_); |
| + } |
| + |
| + private: |
| + // Implements an ordered list by using map with the |sequence_id| as the key, |
| + // so that |queue_| is always sorted by |sequence_id|. |
| + std::map<int, T> queue_; |
| + |
| + int next_incoming_; |
| + int next_outgoing_; |
| + |
| + // This prefix is necessary to disambiguate between the ID's sent from the |
| + // client and the ID's sent from the host. |
| + std::string outgoing_prefix_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue); |
| +}; |
| + |
| +} // namespace remoting |
| + |
| +#endif // REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ |