Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #ifndef REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ | |
| 6 #define REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ | |
| 7 | |
| 8 #include <stdint.h> | |
| 9 | |
| 10 #include <limits> | |
| 11 #include <list> | |
| 12 #include <map> | |
| 13 #include <memory> | |
| 14 #include <string> | |
| 15 | |
| 16 #include "base/macros.h" | |
| 17 #include "base/rand_util.h" | |
| 18 #include "base/strings/string_number_conversions.h" | |
| 19 #include "base/strings/string_split.h" | |
| 20 #include "remoting/protocol/jingle_messages.h" | |
| 21 | |
| 22 namespace { | |
| 23 const int kInvalid = -1; | |
| 24 const int kAny = -1; | |
| 25 | |
| 26 // Extracts a sequential id from the id attribute of the IQ stanza. | |
| 27 int GetSequentialId(std::string id) { | |
| 28 int result = kInvalid; | |
| 29 std::vector<std::string> tokens = | |
| 30 SplitString(id, "_", base::TRIM_WHITESPACE, base::SPLIT_WANT_NONEMPTY); | |
| 31 // Legacy endpoints does not encode the IQ ordering in the ID attribute | |
| 32 if (tokens.size() != 2) { | |
| 33 return kInvalid; | |
| 34 } | |
| 35 | |
| 36 if (!base::StringToInt(tokens[1].c_str(), &result)) { | |
| 37 return kInvalid; | |
| 38 } | |
| 39 return result; | |
| 40 }; | |
| 41 } | |
| 42 | |
| 43 namespace remoting { | |
| 44 | |
| 45 // A Queue that sorts incoming messages and returns them in the ascending order | |
| 46 // of |id|. |id| represents the value of the ID attribute of an IQ stanza, | |
| 47 // which have the following format <opaque_string>_<sequence_id>. | |
| 48 // | |
| 49 // OrderedMessageQueue.OnIncomingMessage() will always return messages in | |
| 50 // ascending order of <sequence_id>. | |
| 51 // | |
| 52 // Background: | |
| 53 // The chromoting signaling channel does not guarantee that the incoming IQs are | |
| 54 // delivered in the order that it is sent. | |
| 55 // | |
| 56 // This behavior leads to transient session setup failures. For instance, | |
| 57 // a <transport-info> that is sent after a <session-info> message is sometimes | |
| 58 // delivered to the client out of order, causing the client to close the | |
| 59 // session due to an unexpected request. | |
| 60 template <typename T> | |
|
Sergey Ulanov
2016/10/14 18:36:54
I'd like to avoid making this a template, given th
| |
| 61 class OrderedMessageQueue { | |
| 62 public: | |
| 63 OrderedMessageQueue() | |
| 64 : next_incoming_(kAny), | |
| 65 next_outgoing_(0), | |
| 66 outgoing_prefix_(base::Uint64ToString( | |
| 67 base::RandGenerator(std::numeric_limits<uint64_t>::max()))){}; | |
| 68 | |
| 69 ~OrderedMessageQueue() {} | |
| 70 | |
| 71 // Returns the list of messages ordered by their sequential IDs. | |
| 72 std::list<T> OnIncomingMessage(std::string id, T message) { | |
| 73 std::list<T> result; | |
| 74 int current = GetSequentialId(id); | |
| 75 // If there is no sequencing order encoded in the id, just return the | |
| 76 // message. | |
| 77 if (current == kInvalid) { | |
| 78 result.push_back(std::move(message)); | |
| 79 return result; | |
| 80 } | |
| 81 | |
| 82 if (next_incoming_ == kAny) { | |
| 83 next_incoming_ = current; | |
| 84 } | |
| 85 | |
| 86 DCHECK(current >= next_incoming_) | |
| 87 << "Duplicate sequence id: current= " << current | |
| 88 << " expected= " << next_incoming_; | |
| 89 DCHECK(queue_.find(current) == queue_.end()) | |
| 90 << "Duplicate sequence id: current = " << current; | |
| 91 | |
| 92 queue_.insert(std::pair<int, T>(current, std::move(message))); | |
| 93 | |
| 94 auto it = queue_.begin(); | |
| 95 while (it != queue_.end() && it->first == next_incoming_) { | |
| 96 result.push_back(std::move(it->second)); | |
| 97 it = queue_.erase(it); | |
| 98 next_incoming_++; | |
| 99 } | |
| 100 | |
| 101 if (current - next_incoming_ >= 3) { | |
| 102 LOG(WARNING) << "Multiple messages are missing: expected= " | |
| 103 << next_incoming_ << " current= " << current; | |
| 104 } | |
| 105 return result; | |
| 106 }; | |
| 107 | |
| 108 // Returns the value of the id attribute of the next outgoing XMPP IQ stanza. | |
| 109 // The other endpoint uses this value to ensure the incoming IQ's are | |
| 110 // processed in order. | |
| 111 std::string GetNextOutgoingId() { | |
| 112 return outgoing_prefix_ + "_" + base::IntToString(++next_outgoing_); | |
| 113 } | |
| 114 | |
| 115 private: | |
| 116 // Implements an ordered list by using map with the |sequence_id| as the key, | |
| 117 // so that |queue_| is always sorted by |sequence_id|. | |
| 118 std::map<int, T> queue_; | |
| 119 | |
| 120 int next_incoming_; | |
| 121 int next_outgoing_; | |
| 122 | |
| 123 // This prefix is necessary to disambiguate between the ID's sent from the | |
| 124 // client and the ID's sent from the host. | |
| 125 std::string outgoing_prefix_; | |
| 126 | |
| 127 DISALLOW_COPY_AND_ASSIGN(OrderedMessageQueue); | |
| 128 }; | |
| 129 | |
| 130 } // namespace remoting | |
| 131 | |
| 132 #endif // REMOTING_PROTOCOL_ORDERED_MESSAGE_QUEUE_H_ | |
| OLD | NEW |