Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Side by Side Diff: remoting/protocol/ordered_message_queue.h

Issue 2417913002: Process incoming IQs in the same order that they were sent. (Closed)
Patch Set: Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_
6 #define REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_
7
8 #include <stdint.h>
9
10 #include <limits>
11 #include <list>
12 #include <map>
13 #include <memory>
14 #include <string>
15
16 #include "base/macros.h"
17 #include "base/rand_util.h"
18 #include "base/strings/string_number_conversions.h"
19 #include "base/strings/string_split.h"
20 #include "remoting/protocol/jingle_messages.h"
21
22 namespace {
23 const int kInvalid = -1;
24 const int kAny = -1;
25
26 // Extracts a sequential id from the id attribute of the IQ stanza.
27 int GetSequentialId(std::string id) {
28 int result = kInvalid;
29 std::vector<std::string> tokens =
30 SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY);
31 // Legacy endpoints does not encode the IQ ordering in the ID attribute
32 if (tokens.size() != 2) {
33 return kInvalid;
34 }
35
36 if (!base::StringToInt(tokens[1].c_str(), &result)) {
37 return kInvalid;
38 }
39 return result;
40 };
41 }
42
43 namespace remoting {
44
45 // A Queue that sorts incoming messages and returns them in the ascending order
46 // of |id|. |id| represents the value of the ID attribute of an IQ stanza,
47 // which have the following format <opaque_string>_<sequence_id>.
48 //
49 // OrderedMessageQueue.OnIncomingMessage() will always return messages in
50 // ascending order of <sequence_id>.
51 //
52 // Background:
53 // The chromoting signaling channel does not guarantee that the incoming IQs are
54 // delivered in the order that it is sent.
55 //
56 // This behavior leads to transient session setup failures. For instance,
57 // a <transport-info> that is sent after a <session-info> message is sometimes
58 // delivered to the client out of order, causing the client to close the
59 // session due to an unexpected request.
60 template <typename T>
Sergey Ulanov 2016/10/14 18:36:54 I'd like to avoid making this a template, given th
61 class OrderedMessageQueue {
62 public:
63 OrderedMessageQueue()
64 : next_incoming_(kAny),
65 next_outgoing_(0),
66 outgoing_prefix_(base::Uint64ToString(
67 base::RandGenerator(std::numeric_limits<uint64_t>::max()))){};
68
69 ~OrderedMessageQueue() {}
70
71 // Returns the list of messages ordered by their sequential IDs.
72 std::list<T> OnIncomingMessage(std::string id, T message) {
73 std::list<T> result;
74 int current = GetSequentialId(id);
75 // If there is no sequencing order encoded in the id, just return the
76 // message.
77 if (current == kInvalid) {
78 result.push_back(std::move(message));
79 return result;
80 }
81
82 if (next_incoming_ == kAny) {
83 next_incoming_ = current;
84 }
85
86 DCHECK(current >= next_incoming_)
87 << "Duplicate sequence id: current= " << current
88 << " expected= " << next_incoming_;
89 DCHECK(queue_.find(current) == queue_.end())
90 << "Duplicate sequence id: current = " << current;
91
92 queue_.insert(std::pair<int, T>(current, std::move(message)));
93
94 auto it = queue_.begin();
95 while (it != queue_.end() && it->first == next_incoming_) {
96 result.push_back(std::move(it->second));
97 it = queue_.erase(it);
98 next_incoming_++;
99 }
100
101 if (current - next_incoming_ >= 3) {
102 LOG(WARNING) << "Multiple messages are missing: expected= "
103 << next_incoming_ << " current= " << current;
104 }
105 return result;
106 };
107
108 // Returns the value of the id attribute of the next outgoing XMPP IQ stanza.
109 // The other endpoint uses this value to ensure the incoming IQ's are
110 // processed in order.
111 std::string GetNextOutgoingId() {
112 return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_);
113 }
114
115 private:
116 // Implements an ordered list by using map with the |sequence_id| as the key,
117 // so that |queue_| is always sorted by |sequence_id|.
118 std::map<int, T> queue_;
119
120 int next_incoming_;
121 int next_outgoing_;
122
123 // This prefix is necessary to disambiguate between the ID's sent from the
124 // client and the ID's sent from the host.
125 std::string outgoing_prefix_;
126
127 DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue);
128 };
129
130 } // namespace remoting
131
132 #endif // REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698