OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #ifndef REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ | |
6 #define REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ | |
7 | |
8 #include <stdint.h> | |
9 | |
10 #include <limits> | |
11 #include <list> | |
12 #include <map> | |
13 #include <memory> | |
14 #include <string> | |
15 | |
16 #include "base/macros.h" | |
17 #include "base/rand_util.h" | |
18 #include "base/strings/string_number_conversions.h" | |
19 #include "base/strings/string_split.h" | |
20 #include "remoting/protocol/jingle_messages.h" | |
21 | |
22 namespace { | |
23 const int kInvalid = -1; | |
24 const int kAny = -1; | |
25 | |
26 // Extracts a sequential id from the id attribute of the IQ stanza. | |
27 int GetSequentialId(std::string id) { | |
28 int result = kInvalid; | |
29 std::vector<std::string> tokens = | |
30 SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY); | |
31 // Legacy endpoints does not encode the IQ ordering in the ID attribute | |
32 if (tokens.size() != 2) { | |
33 return kInvalid; | |
34 } | |
35 | |
36 if (!base::StringToInt(tokens[1].c_str(), &result)) { | |
37 return kInvalid; | |
38 } | |
39 return result; | |
40 }; | |
41 } | |
42 | |
43 namespace remoting { | |
44 | |
45 // A Queue that sorts incoming messages and returns them in the ascending order | |
46 // of |id|. |id| represents the value of the ID attribute of an IQ stanza, | |
47 // which have the following format <opaque_string>_<sequence_id>. | |
48 // | |
49 // OrderedMessageQueue.OnIncomingMessage() will always return messages in | |
50 // ascending order of <sequence_id>. | |
51 // | |
52 // Background: | |
53 // The chromoting signaling channel does not guarantee that the incoming IQs are | |
54 // delivered in the order that it is sent. | |
55 // | |
56 // This behavior leads to transient session setup failures. For instance, | |
57 // a <transport-info> that is sent after a <session-info> message is sometimes | |
58 // delivered to the client out of order, causing the client to close the | |
59 // session due to an unexpected request. | |
60 template <typename T> | |
Sergey Ulanov
2016/10/14 18:36:54
I'd like to avoid making this a template, given th
| |
61 class OrderedMessageQueue { | |
62 public: | |
63 OrderedMessageQueue() | |
64 : next_incoming_(kAny), | |
65 next_outgoing_(0), | |
66 outgoing_prefix_(base::Uint64ToString( | |
67 base::RandGenerator(std::numeric_limits<uint64_t>::max()))){}; | |
68 | |
69 ~OrderedMessageQueue() {} | |
70 | |
71 // Returns the list of messages ordered by their sequential IDs. | |
72 std::list<T> OnIncomingMessage(std::string id, T message) { | |
73 std::list<T> result; | |
74 int current = GetSequentialId(id); | |
75 // If there is no sequencing order encoded in the id, just return the | |
76 // message. | |
77 if (current == kInvalid) { | |
78 result.push_back(std::move(message)); | |
79 return result; | |
80 } | |
81 | |
82 if (next_incoming_ == kAny) { | |
83 next_incoming_ = current; | |
84 } | |
85 | |
86 DCHECK(current >= next_incoming_) | |
87 << "Duplicate sequence id: current= " << current | |
88 << " expected= " << next_incoming_; | |
89 DCHECK(queue_.find(current) == queue_.end()) | |
90 << "Duplicate sequence id: current = " << current; | |
91 | |
92 queue_.insert(std::pair<int, T>(current, std::move(message))); | |
93 | |
94 auto it = queue_.begin(); | |
95 while (it != queue_.end() && it->first == next_incoming_) { | |
96 result.push_back(std::move(it->second)); | |
97 it = queue_.erase(it); | |
98 next_incoming_++; | |
99 } | |
100 | |
101 if (current - next_incoming_ >= 3) { | |
102 LOG(WARNING) << "Multiple messages are missing: expected= " | |
103 << next_incoming_ << " current= " << current; | |
104 } | |
105 return result; | |
106 }; | |
107 | |
108 // Returns the value of the id attribute of the next outgoing XMPP IQ stanza. | |
109 // The other endpoint uses this value to ensure the incoming IQ's are | |
110 // processed in order. | |
111 std::string GetNextOutgoingId() { | |
112 return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_); | |
113 } | |
114 | |
115 private: | |
116 // Implements an ordered list by using map with the |sequence_id| as the key, | |
117 // so that |queue_| is always sorted by |sequence_id|. | |
118 std::map<int, T> queue_; | |
119 | |
120 int next_incoming_; | |
121 int next_outgoing_; | |
122 | |
123 // This prefix is necessary to disambiguate between the ID's sent from the | |
124 // client and the ID's sent from the host. | |
125 std::string outgoing_prefix_; | |
126 | |
127 DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue); | |
128 }; | |
129 | |
130 } // namespace remoting | |
131 | |
132 #endif // REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ | |
OLD | NEW |