Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Unified Diff: remoting/protocol/ordered_message_queue.h

Issue 2417913002: Process incoming IQs in the same order that they were sent. (Closed)
Patch Set: Reviewer's feedback Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: remoting/protocol/ordered_message_queue.h
diff --git a/remoting/protocol/ordered_message_queue.h b/remoting/protocol/ordered_message_queue.h
new file mode 100644
index 0000000000000000000000000000000000000000..df1cf1aee4920352bc86898436a9b98cedc2a021
--- /dev/null
+++ b/remoting/protocol/ordered_message_queue.h
@@ -0,0 +1,132 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_
Sergey Ulanov 2016/10/19 21:23:39 remove this file?
kelvinp 2016/10/21 00:26:03 Done.
+#define REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_
+
+#include <stdint.h>
+
+#include <limits>
+#include <list>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "base/macros.h"
+#include "base/rand_util.h"
+#include "base/strings/string_number_conversions.h"
+#include "base/strings/string_split.h"
+#include "remoting/protocol/jingle_messages.h"
+
+namespace {
+const int kInvalid = -1;
+const int kAny = -1;
+
+// Extracts a sequential id from the id attribute of the IQ stanza.
+int GetSequentialId(std::string id) {
+ int result = kInvalid;
+ std::vector<std::string> tokens =
+ SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY);
+ // Legacy endpoints does not encode the IQ ordering in the ID attribute
+ if (tokens.size() != 2) {
+ return kInvalid;
+ }
+
+ if (!base::StringToInt(tokens[1].c_str(), &result)) {
+ return kInvalid;
+ }
+ return result;
+};
+}
+
+namespace remoting {
+
+// A Queue that sorts incoming messages and returns them in the ascending order
+// of |id|. |id| represents the value of the ID attribute of an IQ stanza,
+// which have the following format <opaque_string>_<sequence_id>.
+//
+// OrderedMessageQueue.OnIncomingMessage() will always return messages in
+// ascending order of <sequence_id>.
+//
+// Background:
+// The chromoting signaling channel does not guarantee that the incoming IQs are
+// delivered in the order that it is sent.
+//
+// This behavior leads to transient session setup failures. For instance,
+// a <transport-info> that is sent after a <session-info> message is sometimes
+// delivered to the client out of order, causing the client to close the
+// session due to an unexpected request.
+template <typename T>
+class OrderedMessageQueue {
+ public:
+ OrderedMessageQueue()
+ : next_incoming_(kAny),
+ next_outgoing_(0),
+ outgoing_prefix_(base::Uint64ToString(
+ base::RandGenerator(std::numeric_limits<uint64_t>::max()))){};
+
+ ~OrderedMessageQueue() {}
+
+ // Returns the list of messages ordered by their sequential IDs.
+ std::list<T> OnIncomingMessage(std::string id, T message) {
+ std::list<T> result;
+ int current = GetSequentialId(id);
+ // If there is no sequencing order encoded in the id, just return the
+ // message.
+ if (current == kInvalid) {
+ result.push_back(std::move(message));
+ return result;
+ }
+
+ if (next_incoming_ == kAny) {
+ next_incoming_ = current;
+ }
+
+ DCHECK(current >= next_incoming_)
+ << "Duplicate sequence id: current= " << current
+ << " expected= " << next_incoming_;
+ DCHECK(queue_.find(current) == queue_.end())
+ << "Duplicate sequence id: current = " << current;
+
+ queue_.insert(std::pair<int, T>(current, std::move(message)));
+
+ auto it = queue_.begin();
+ while (it != queue_.end() && it->first == next_incoming_) {
+ result.push_back(std::move(it->second));
+ it = queue_.erase(it);
+ next_incoming_++;
+ }
+
+ if (current - next_incoming_ >= 3) {
+ LOG(WARNING) << "Multiple messages are missing: expected= "
+ << next_incoming_ << " current= " << current;
+ }
+ return result;
+ };
+
+ // Returns the value of the id attribute of the next outgoing XMPP IQ stanza.
+ // The other endpoint uses this value to ensure the incoming IQ's are
+ // processed in order.
+ std::string GetNextOutgoingId() {
+ return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_);
+ }
+
+ private:
+ // Implements an ordered list by using map with the |sequence_id| as the key,
+ // so that |queue_| is always sorted by |sequence_id|.
+ std::map<int, T> queue_;
+
+ int next_incoming_;
+ int next_outgoing_;
+
+ // This prefix is necessary to disambiguate between the ID's sent from the
+ // client and the ID's sent from the host.
+ std::string outgoing_prefix_;
+
+ DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue);
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_

Powered by Google App Engine
This is Rietveld 408576698