| OLD | NEW |
| (Empty) | |
| 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" |
| 6 |
| 7 #include "base/base64.h" |
| 8 #include "base/callback.h" |
| 9 #include "base/logging.h" |
| 10 #include "chrome/common/net/notifier/listener/xml_element_util.h" |
| 11 #include "google/cacheinvalidation/invalidation-client.h" |
| 12 #include "talk/xmpp/constants.h" |
| 13 #include "talk/xmpp/jid.h" |
| 14 #include "talk/xmpp/xmppclient.h" |
| 15 #include "talk/xmpp/xmpptask.h" |
| 16 |
| 17 namespace sync_notifier { |
| 18 |
| 19 namespace { |
| 20 |
| 21 const char kBotJid[] = "tango@bot.talk.google.com"; |
| 22 |
| 23 // TODO(akalin): Eliminate use of 'tango' name. |
| 24 |
| 25 // TODO(akalin): Hash out details of tango IQ protocol. |
| 26 |
| 27 static const buzz::QName kQnTangoIqPacket("google:tango", "packet"); |
| 28 static const buzz::QName kQnTangoIqPacketContent( |
| 29 "google:tango", "content"); |
| 30 |
| 31 // TODO(akalin): Move these task classes out so that they can be |
| 32 // unit-tested. This'll probably be done easier once we consolidate |
| 33 // all the packet sending/receiving classes. |
| 34 |
| 35 // A task that listens for ClientInvalidation messages and calls the |
| 36 // given callback on them. |
| 37 class CacheInvalidationListenTask : public buzz::XmppTask { |
| 38 public: |
| 39 // Takes ownership of callback. |
| 40 CacheInvalidationListenTask(Task* parent, |
| 41 Callback1<const std::string&>::Type* callback) |
| 42 : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} |
| 43 virtual ~CacheInvalidationListenTask() {} |
| 44 |
| 45 virtual int ProcessStart() { |
| 46 LOG(INFO) << "CacheInvalidationListenTask started"; |
| 47 return STATE_RESPONSE; |
| 48 } |
| 49 |
| 50 virtual int ProcessResponse() { |
| 51 const buzz::XmlElement* stanza = NextStanza(); |
| 52 if (stanza == NULL) { |
| 53 LOG(INFO) << "CacheInvalidationListenTask blocked"; |
| 54 return STATE_BLOCKED; |
| 55 } |
| 56 LOG(INFO) << "CacheInvalidationListenTask response received"; |
| 57 std::string data; |
| 58 if (GetTangoIqPacketData(stanza, &data)) { |
| 59 callback_->Run(data); |
| 60 } else { |
| 61 LOG(ERROR) << "Could not get packet data"; |
| 62 } |
| 63 // Acknowledge receipt of the iq to the buzz server. |
| 64 // TODO(akalin): Send an error response for malformed packets. |
| 65 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); |
| 66 SendStanza(response_stanza.get()); |
| 67 return STATE_RESPONSE; |
| 68 } |
| 69 |
| 70 virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
| 71 LOG(INFO) << "Stanza received: " |
| 72 << notifier::XmlElementToString(*stanza); |
| 73 if (IsValidTangoIqPacket(stanza)) { |
| 74 LOG(INFO) << "Queueing stanza"; |
| 75 QueueStanza(stanza); |
| 76 return true; |
| 77 } |
| 78 LOG(INFO) << "Stanza skipped"; |
| 79 return false; |
| 80 } |
| 81 |
| 82 private: |
| 83 bool IsValidTangoIqPacket(const buzz::XmlElement* stanza) { |
| 84 return |
| 85 (MatchRequestIq(stanza, buzz::STR_SET, kQnTangoIqPacket) && |
| 86 (stanza->Attr(buzz::QN_TO) == GetClient()->jid().Str())); |
| 87 } |
| 88 |
| 89 bool GetTangoIqPacketData(const buzz::XmlElement* stanza, |
| 90 std::string* data) { |
| 91 DCHECK(IsValidTangoIqPacket(stanza)); |
| 92 const buzz::XmlElement* tango_iq_packet = |
| 93 stanza->FirstNamed(kQnTangoIqPacket); |
| 94 if (!tango_iq_packet) { |
| 95 LOG(ERROR) << "Could not find tango IQ packet element"; |
| 96 return false; |
| 97 } |
| 98 const buzz::XmlElement* tango_iq_packet_content = |
| 99 tango_iq_packet->FirstNamed(kQnTangoIqPacketContent); |
| 100 if (!tango_iq_packet_content) { |
| 101 LOG(ERROR) << "Could not find tango IQ packet content element"; |
| 102 return false; |
| 103 } |
| 104 *data = tango_iq_packet_content->BodyText(); |
| 105 return true; |
| 106 } |
| 107 |
| 108 scoped_ptr<Callback1<const std::string&>::Type> callback_; |
| 109 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); |
| 110 }; |
| 111 |
| 112 // A task that sends a single outbound ClientInvalidation message. |
| 113 class CacheInvalidationSendMessageTask : public buzz::XmppTask { |
| 114 public: |
| 115 CacheInvalidationSendMessageTask(Task* parent, |
| 116 const buzz::Jid& to_jid, |
| 117 const std::string& msg) |
| 118 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), |
| 119 to_jid_(to_jid), msg_(msg) {} |
| 120 virtual ~CacheInvalidationSendMessageTask() {} |
| 121 |
| 122 virtual int ProcessStart() { |
| 123 scoped_ptr<buzz::XmlElement> stanza( |
| 124 MakeTangoIqPacket(to_jid_, task_id(), msg_)); |
| 125 LOG(INFO) << "Sending message: " |
| 126 << notifier::XmlElementToString(*stanza.get()); |
| 127 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { |
| 128 LOG(INFO) << "Error when sending message"; |
| 129 return STATE_ERROR; |
| 130 } |
| 131 return STATE_RESPONSE; |
| 132 } |
| 133 |
| 134 virtual int ProcessResponse() { |
| 135 const buzz::XmlElement* stanza = NextStanza(); |
| 136 if (stanza == NULL) { |
| 137 LOG(INFO) << "CacheInvalidationSendMessageTask blocked..."; |
| 138 return STATE_BLOCKED; |
| 139 } |
| 140 LOG(INFO) << "CacheInvalidationSendMessageTask response received: " |
| 141 << notifier::XmlElementToString(*stanza); |
| 142 // TODO(akalin): Handle errors here. |
| 143 return STATE_DONE; |
| 144 } |
| 145 |
| 146 virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
| 147 LOG(INFO) << "Stanza received: " |
| 148 << notifier::XmlElementToString(*stanza); |
| 149 if (!MatchResponseIq(stanza, to_jid_, task_id())) { |
| 150 LOG(INFO) << "Stanza skipped"; |
| 151 return false; |
| 152 } |
| 153 LOG(INFO) << "Queueing stanza"; |
| 154 QueueStanza(stanza); |
| 155 return true; |
| 156 } |
| 157 |
| 158 private: |
| 159 static buzz::XmlElement* MakeTangoIqPacket( |
| 160 const buzz::Jid& to_jid, |
| 161 const std::string& task_id, |
| 162 const std::string& msg) { |
| 163 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); |
| 164 buzz::XmlElement* tango_iq_packet = |
| 165 new buzz::XmlElement(kQnTangoIqPacket, true); |
| 166 iq->AddElement(tango_iq_packet); |
| 167 buzz::XmlElement* tango_iq_packet_content = |
| 168 new buzz::XmlElement(kQnTangoIqPacketContent, true); |
| 169 tango_iq_packet->AddElement(tango_iq_packet_content); |
| 170 tango_iq_packet_content->SetBodyText(msg); |
| 171 return iq; |
| 172 } |
| 173 |
| 174 const buzz::Jid to_jid_; |
| 175 std::string msg_; |
| 176 |
| 177 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); |
| 178 }; |
| 179 |
| 180 } // namespace |
| 181 |
| 182 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( |
| 183 buzz::XmppClient* xmpp_client, |
| 184 invalidation::InvalidationClient* invalidation_client) |
| 185 : xmpp_client_(xmpp_client), |
| 186 invalidation_client_(invalidation_client) { |
| 187 CHECK(xmpp_client_); |
| 188 CHECK(invalidation_client_); |
| 189 invalidation::NetworkEndpoint* network_endpoint = |
| 190 invalidation_client_->network_endpoint(); |
| 191 CHECK(network_endpoint); |
| 192 network_endpoint->RegisterOutboundListener( |
| 193 invalidation::NewPermanentCallback( |
| 194 this, |
| 195 &CacheInvalidationPacketHandler::HandleOutboundPacket)); |
| 196 // Owned by xmpp_client. |
| 197 CacheInvalidationListenTask* listen_task = |
| 198 new CacheInvalidationListenTask( |
| 199 xmpp_client, NewCallback( |
| 200 this, &CacheInvalidationPacketHandler::HandleInboundPacket)); |
| 201 listen_task->Start(); |
| 202 } |
| 203 |
| 204 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { |
| 205 invalidation::NetworkEndpoint* network_endpoint = |
| 206 invalidation_client_->network_endpoint(); |
| 207 CHECK(network_endpoint); |
| 208 network_endpoint->RegisterOutboundListener(NULL); |
| 209 } |
| 210 |
| 211 void CacheInvalidationPacketHandler::HandleOutboundPacket( |
| 212 invalidation::NetworkEndpoint* const& network_endpoint) { |
| 213 CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint()); |
| 214 invalidation::string message; |
| 215 network_endpoint->TakeOutboundMessage(&message); |
| 216 std::string encoded_message; |
| 217 if (!base::Base64Encode(message, &encoded_message)) { |
| 218 LOG(ERROR) << "Could not base64-encode message to send: " |
| 219 << message; |
| 220 return; |
| 221 } |
| 222 // Owned by xmpp_client. |
| 223 CacheInvalidationSendMessageTask* send_message_task = |
| 224 new CacheInvalidationSendMessageTask(xmpp_client_, |
| 225 buzz::Jid(kBotJid), |
| 226 encoded_message); |
| 227 send_message_task->Start(); |
| 228 } |
| 229 |
| 230 void CacheInvalidationPacketHandler::HandleInboundPacket( |
| 231 const std::string& packet) { |
| 232 invalidation::NetworkEndpoint* network_endpoint = |
| 233 invalidation_client_->network_endpoint(); |
| 234 std::string decoded_message; |
| 235 if (!base::Base64Decode(packet, &decoded_message)) { |
| 236 LOG(ERROR) << "Could not base64-decode received message: " |
| 237 << packet; |
| 238 return; |
| 239 } |
| 240 network_endpoint->HandleInboundMessage(decoded_message); |
| 241 } |
| 242 |
| 243 } // namespace sync_notifier |
| OLD | NEW |