Index: remoting/protocol/ordered_message_queue.h |
diff --git a/remoting/protocol/ordered_message_queue.h b/remoting/protocol/ordered_message_queue.h |
new file mode 100644 |
index 0000000000000000000000000000000000000000..df1cf1aee4920352bc86898436a9b98cedc2a021 |
--- /dev/null |
+++ b/remoting/protocol/ordered_message_queue.h |
@@ -0,0 +1,132 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#ifndef REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ |
+#define REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ |
+ |
+#include <stdint.h> |
+ |
+#include <limits> |
+#include <list> |
+#include <map> |
+#include <memory> |
+#include <string> |
+ |
+#include "base/macros.h" |
+#include "base/rand_util.h" |
+#include "base/strings/string_number_conversions.h" |
+#include "base/strings/string_split.h" |
+#include "remoting/protocol/jingle_messages.h" |
+ |
+namespace { |
+const int kInvalid = -1; |
+const int kAny = -1; |
+ |
+// Extracts a sequential id from the id attribute of the IQ stanza. |
+int GetSequentialId(std::string id) { |
+ int result = kInvalid; |
+ std::vector<std::string> tokens = |
+ SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY); |
+ // Legacy endpoints does not encode the IQ ordering in the ID attribute |
+ if (tokens.size() != 2) { |
+ return kInvalid; |
+ } |
+ |
+ if (!base::StringToInt(tokens[1].c_str(), &result)) { |
+ return kInvalid; |
+ } |
+ return result; |
+}; |
+} |
+ |
+namespace remoting { |
+ |
+// A Queue that sorts incoming messages and returns them in the ascending order |
+// of |id|. |id| represents the value of the ID attribute of an IQ stanza, |
+// which have the following format <opaque_string>_<sequence_id>. |
+// |
+// OrderedMessageQueue.OnIncomingMessage() will always return messages in |
+// ascending order of <sequence_id>. |
+// |
+// Background: |
+// The chromoting signaling channel does not guarantee that the incoming IQs are |
+// delivered in the order that it is sent. |
+// |
+// This behavior leads to transient session setup failures. For instance, |
+// a <transport-info> that is sent after a <session-info> message is sometimes |
+// delivered to the client out of order, causing the client to close the |
+// session due to an unexpected request. |
+template <typename T> |
Sergey Ulanov
2016/10/14 18:36:54
I'd like to avoid making this a template, given th
|
+class OrderedMessageQueue { |
+ public: |
+ OrderedMessageQueue() |
+ : next_incoming_(kAny), |
+ next_outgoing_(0), |
+ outgoing_prefix_(base::Uint64ToString( |
+ base::RandGenerator(std::numeric_limits<uint64_t>::max()))){}; |
+ |
+ ~OrderedMessageQueue() {} |
+ |
+ // Returns the list of messages ordered by their sequential IDs. |
+ std::list<T> OnIncomingMessage(std::string id, T message) { |
+ std::list<T> result; |
+ int current = GetSequentialId(id); |
+ // If there is no sequencing order encoded in the id, just return the |
+ // message. |
+ if (current == kInvalid) { |
+ result.push_back(std::move(message)); |
+ return result; |
+ } |
+ |
+ if (next_incoming_ == kAny) { |
+ next_incoming_ = current; |
+ } |
+ |
+ DCHECK(current >= next_incoming_) |
+ << "Duplicate sequence id: current= " << current |
+ << " expected= " << next_incoming_; |
+ DCHECK(queue_.find(current) == queue_.end()) |
+ << "Duplicate sequence id: current = " << current; |
+ |
+ queue_.insert(std::pair<int, T>(current, std::move(message))); |
+ |
+ auto it = queue_.begin(); |
+ while (it != queue_.end() && it->first == next_incoming_) { |
+ result.push_back(std::move(it->second)); |
+ it = queue_.erase(it); |
+ next_incoming_++; |
+ } |
+ |
+ if (current - next_incoming_ >= 3) { |
+ LOG(WARNING) << "Multiple messages are missing: expected= " |
+ << next_incoming_ << " current= " << current; |
+ } |
+ return result; |
+ }; |
+ |
+ // Returns the value of the id attribute of the next outgoing XMPP IQ stanza. |
+ // The other endpoint uses this value to ensure the incoming IQ's are |
+ // processed in order. |
+ std::string GetNextOutgoingId() { |
+ return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_); |
+ } |
+ |
+ private: |
+ // Implements an ordered list by using map with the |sequence_id| as the key, |
+ // so that |queue_| is always sorted by |sequence_id|. |
+ std::map<int, T> queue_; |
+ |
+ int next_incoming_; |
+ int next_outgoing_; |
+ |
+ // This prefix is necessary to disambiguate between the ID's sent from the |
+ // client and the ID's sent from the host. |
+ std::string outgoing_prefix_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue); |
+}; |
+ |
+} // namespace remoting |
+ |
+#endif // REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ |