OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
10 #include "base/time/clock.h" | |
11 #include "base/time/time.h" | |
10 #include "google_apis/gcm/base/mcs_util.h" | 12 #include "google_apis/gcm/base/mcs_util.h" |
11 #include "google_apis/gcm/base/socket_stream.h" | 13 #include "google_apis/gcm/base/socket_stream.h" |
12 #include "google_apis/gcm/engine/connection_factory.h" | 14 #include "google_apis/gcm/engine/connection_factory.h" |
13 #include "google_apis/gcm/engine/rmq_store.h" | 15 #include "google_apis/gcm/engine/rmq_store.h" |
14 | 16 |
15 using namespace google::protobuf::io; | 17 using namespace google::protobuf::io; |
16 | 18 |
17 namespace gcm { | 19 namespace gcm { |
18 | 20 |
19 namespace { | 21 namespace { |
(...skipping 17 matching lines...) Expand all Loading... | |
37 // Applies to both incoming and outgoing messages. | 39 // Applies to both incoming and outgoing messages. |
38 // TODO(zea): make this server configurable. | 40 // TODO(zea): make this server configurable. |
39 const int kUnackedMessageBeforeStreamAck = 10; | 41 const int kUnackedMessageBeforeStreamAck = 10; |
40 | 42 |
41 // The global maximum number of pending messages to have in the send queue. | 43 // The global maximum number of pending messages to have in the send queue. |
42 const size_t kMaxSendQueueSize = 10 * 1024; | 44 const size_t kMaxSendQueueSize = 10 * 1024; |
43 | 45 |
44 // The maximum message size that can be sent to the server. | 46 // The maximum message size that can be sent to the server. |
45 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. | 47 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. |
46 | 48 |
49 // Maximum amount of time to save an unsent outgoing message for. | |
50 const int kMaxTTLSeconds = 4 * 7 * 24 * 60 * 60; // 4 weeks. | |
51 | |
47 // Helper for converting a proto persistent id list to a vector of strings. | 52 // Helper for converting a proto persistent id list to a vector of strings. |
48 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, | 53 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, |
49 std::vector<std::string>* id_list) { | 54 std::vector<std::string>* id_list) { |
50 mcs_proto::SelectiveAck selective_ack; | 55 mcs_proto::SelectiveAck selective_ack; |
51 if (!selective_ack.ParseFromString(bytes)) | 56 if (!selective_ack.ParseFromString(bytes)) |
52 return false; | 57 return false; |
53 std::vector<std::string> new_list; | 58 std::vector<std::string> new_list; |
54 for (int i = 0; i < selective_ack.id_size(); ++i) { | 59 for (int i = 0; i < selective_ack.id_size(); ++i) { |
55 DCHECK(!selective_ack.id(i).empty()); | 60 DCHECK(!selective_ack.id(i).empty()); |
56 new_list.push_back(selective_ack.id(i)); | 61 new_list.push_back(selective_ack.id(i)); |
(...skipping 19 matching lines...) Expand all Loading... | |
76 | 81 |
77 // The protobuf of the message itself. | 82 // The protobuf of the message itself. |
78 MCSProto protobuf; | 83 MCSProto protobuf; |
79 }; | 84 }; |
80 | 85 |
81 ReliablePacketInfo::ReliablePacketInfo() | 86 ReliablePacketInfo::ReliablePacketInfo() |
82 : stream_id(0), tag(0) { | 87 : stream_id(0), tag(0) { |
83 } | 88 } |
84 ReliablePacketInfo::~ReliablePacketInfo() {} | 89 ReliablePacketInfo::~ReliablePacketInfo() {} |
85 | 90 |
86 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) | 91 MCSClient::MCSClient(base::Clock* clock, |
87 : state_(UNINITIALIZED), | 92 ConnectionFactory* connection_factory, |
93 RMQStore* rmq_store) | |
94 : clock_(clock), | |
95 state_(UNINITIALIZED), | |
88 android_id_(0), | 96 android_id_(0), |
89 security_token_(0), | 97 security_token_(0), |
90 connection_factory_(connection_factory), | 98 connection_factory_(connection_factory), |
91 connection_handler_(NULL), | 99 connection_handler_(NULL), |
92 last_device_to_server_stream_id_received_(0), | 100 last_device_to_server_stream_id_received_(0), |
93 last_server_to_device_stream_id_received_(0), | 101 last_server_to_device_stream_id_received_(0), |
94 stream_id_out_(0), | 102 stream_id_out_(0), |
95 stream_id_in_(0), | 103 stream_id_in_(0), |
96 rmq_store_(rmq_store), | 104 rmq_store_(rmq_store), |
97 weak_ptr_factory_(this) { | 105 weak_ptr_factory_(this) { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
142 | 150 |
143 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() | 151 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() |
144 << " incoming acks pending and " | 152 << " incoming acks pending and " |
145 << load_result.outgoing_messages.size() | 153 << load_result.outgoing_messages.size() |
146 << " outgoing messages pending."; | 154 << " outgoing messages pending."; |
147 | 155 |
148 restored_unackeds_server_ids_ = load_result.incoming_messages; | 156 restored_unackeds_server_ids_ = load_result.incoming_messages; |
149 | 157 |
150 // First go through and order the outgoing messages by recency. | 158 // First go through and order the outgoing messages by recency. |
151 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; | 159 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; |
160 std::vector<PersistentId> dropped_ids; | |
152 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator | 161 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator |
153 iter = load_result.outgoing_messages.begin(); | 162 iter = load_result.outgoing_messages.begin(); |
154 iter != load_result.outgoing_messages.end(); ++iter) { | 163 iter != load_result.outgoing_messages.end(); ++iter) { |
155 uint64 timestamp = 0; | 164 uint64 timestamp = 0; |
156 if (!base::StringToUint64(iter->first, ×tamp)) { | 165 if (!base::StringToUint64(iter->first, ×tamp)) { |
157 LOG(ERROR) << "Invalid restored message."; | 166 LOG(ERROR) << "Invalid restored message."; |
158 return; | 167 return; |
159 } | 168 } |
169 | |
170 // Check if the TTL has expired for this message. | |
171 if (HasTTLExpired(*iter->second, clock_)) { | |
172 dropped_ids.push_back(iter->first); | |
173 message_sent_callback_.Run("TTL expired for " + iter->first); | |
174 delete iter->second; | |
175 continue; | |
176 } | |
177 | |
160 ordered_messages[timestamp] = iter->second; | 178 ordered_messages[timestamp] = iter->second; |
161 } | 179 } |
162 | 180 |
181 if (!dropped_ids.empty()) { | |
182 rmq_store_->RemoveOutgoingMessages( | |
183 dropped_ids, | |
184 base::Bind(&MCSClient::OnRMQUpdateFinished, | |
185 weak_ptr_factory_.GetWeakPtr())); | |
186 } | |
187 | |
163 // Now go through and add the outgoing messages to the send queue in their | 188 // Now go through and add the outgoing messages to the send queue in their |
164 // appropriate order (oldest at front, most recent at back). | 189 // appropriate order (oldest at front, most recent at back). |
165 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator | 190 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
166 iter = ordered_messages.begin(); | 191 iter = ordered_messages.begin(); |
167 iter != ordered_messages.end(); ++iter) { | 192 iter != ordered_messages.end(); ++iter) { |
168 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 193 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
169 packet_info->protobuf.reset(iter->second); | 194 packet_info->protobuf.reset(iter->second); |
170 packet_info->persistent_id = base::Uint64ToString(iter->first); | 195 packet_info->persistent_id = base::Uint64ToString(iter->first); |
171 to_send_.push_back(make_linked_ptr(packet_info)); | 196 to_send_.push_back(make_linked_ptr(packet_info)); |
172 } | 197 } |
(...skipping 14 matching lines...) Expand all Loading... | |
187 android_id_, | 212 android_id_, |
188 security_token_, | 213 security_token_, |
189 base::Bind(&MCSClient::OnRMQUpdateFinished, | 214 base::Bind(&MCSClient::OnRMQUpdateFinished, |
190 weak_ptr_factory_.GetWeakPtr())); | 215 weak_ptr_factory_.GetWeakPtr())); |
191 } | 216 } |
192 | 217 |
193 state_ = CONNECTING; | 218 state_ = CONNECTING; |
194 connection_factory_->Connect(); | 219 connection_factory_->Connect(); |
195 } | 220 } |
196 | 221 |
197 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { | 222 void MCSClient::SendMessage(const MCSMessage& message) { |
198 DCHECK_EQ(state_, CONNECTED); | 223 int ttl = GetTTL(message.GetProtobuf()); |
224 DCHECK_GE(ttl, 0); | |
225 DCHECK_LE(ttl, kMaxTTLSeconds); | |
199 if (to_send_.size() > kMaxSendQueueSize) { | 226 if (to_send_.size() > kMaxSendQueueSize) { |
200 base::MessageLoop::current()->PostTask( | 227 message_sent_callback_.Run("Message queue full."); |
201 FROM_HERE, | |
202 base::Bind(message_sent_callback_, "Message queue full.")); | |
203 return; | 228 return; |
204 } | 229 } |
205 if (message.size() > kMaxMessageBytes) { | 230 if (message.size() > kMaxMessageBytes) { |
206 base::MessageLoop::current()->PostTask( | 231 message_sent_callback_.Run("Message too large."); |
207 FROM_HERE, | |
208 base::Bind(message_sent_callback_, "Message too large.")); | |
209 return; | 232 return; |
210 } | 233 } |
211 | 234 |
212 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 235 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
213 packet_info->protobuf = message.CloneProtobuf(); | 236 packet_info->protobuf = message.CloneProtobuf(); |
214 | 237 |
215 if (use_rmq) { | 238 if (ttl > 0) { |
216 PersistentId persistent_id = GetNextPersistentId(); | 239 PersistentId persistent_id = GetNextPersistentId(); |
217 DVLOG(1) << "Setting persistent id to " << persistent_id; | 240 DVLOG(1) << "Setting persistent id to " << persistent_id; |
218 packet_info->persistent_id = persistent_id; | 241 packet_info->persistent_id = persistent_id; |
219 SetPersistentId(persistent_id, | 242 SetPersistentId(persistent_id, |
220 packet_info->protobuf.get()); | 243 packet_info->protobuf.get()); |
221 rmq_store_->AddOutgoingMessage(persistent_id, | 244 rmq_store_->AddOutgoingMessage(persistent_id, |
222 MCSMessage(message.tag(), | 245 MCSMessage(message.tag(), |
223 *(packet_info->protobuf)), | 246 *(packet_info->protobuf)), |
224 base::Bind(&MCSClient::OnRMQUpdateFinished, | 247 base::Bind(&MCSClient::OnRMQUpdateFinished, |
225 weak_ptr_factory_.GetWeakPtr())); | 248 weak_ptr_factory_.GetWeakPtr())); |
226 } else { | 249 } else if (!connection_factory_->IsEndpointReachable()) { |
227 // Check that there is an active connection to the endpoint. | 250 DVLOG(1) << "No active connection, dropping message."; |
228 if (!connection_handler_->CanSendMessage()) { | 251 message_sent_callback_.Run("TTL expired"); |
229 base::MessageLoop::current()->PostTask( | 252 return; |
230 FROM_HERE, | |
231 base::Bind(message_sent_callback_, "Unable to reach endpoint")); | |
232 return; | |
233 } | |
234 } | 253 } |
235 to_send_.push_back(make_linked_ptr(packet_info)); | 254 to_send_.push_back(make_linked_ptr(packet_info)); |
236 MaybeSendMessage(); | 255 MaybeSendMessage(); |
237 } | 256 } |
238 | 257 |
239 void MCSClient::Destroy() { | 258 void MCSClient::Destroy() { |
240 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, | 259 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, |
241 weak_ptr_factory_.GetWeakPtr())); | 260 weak_ptr_factory_.GetWeakPtr())); |
242 } | 261 } |
243 | 262 |
244 void MCSClient::ResetStateAndBuildLoginRequest( | 263 void MCSClient::ResetStateAndBuildLoginRequest( |
245 mcs_proto::LoginRequest* request) { | 264 mcs_proto::LoginRequest* request) { |
246 DCHECK(android_id_); | 265 DCHECK(android_id_); |
247 DCHECK(security_token_); | 266 DCHECK(security_token_); |
248 stream_id_in_ = 0; | 267 stream_id_in_ = 0; |
249 stream_id_out_ = 1; | 268 stream_id_out_ = 1; |
250 last_device_to_server_stream_id_received_ = 0; | 269 last_device_to_server_stream_id_received_ = 0; |
251 last_server_to_device_stream_id_received_ = 0; | 270 last_server_to_device_stream_id_received_ = 0; |
252 | 271 |
253 heartbeat_manager_.Stop(); | 272 heartbeat_manager_.Stop(); |
254 | 273 |
255 // TODO(zea): expire all messages older than their TTL. | |
256 | |
257 // Add any pending acknowledgments to the list of ids. | 274 // Add any pending acknowledgments to the list of ids. |
258 for (StreamIdToPersistentIdMap::const_iterator iter = | 275 for (StreamIdToPersistentIdMap::const_iterator iter = |
259 unacked_server_ids_.begin(); | 276 unacked_server_ids_.begin(); |
260 iter != unacked_server_ids_.end(); ++iter) { | 277 iter != unacked_server_ids_.end(); ++iter) { |
261 restored_unackeds_server_ids_.push_back(iter->second); | 278 restored_unackeds_server_ids_.push_back(iter->second); |
262 } | 279 } |
263 unacked_server_ids_.clear(); | 280 unacked_server_ids_.clear(); |
264 | 281 |
265 // Any acknowledged server ids which have not been confirmed by the server | 282 // Any acknowledged server ids which have not been confirmed by the server |
266 // are treated like unacknowledged ids. | 283 // are treated like unacknowledged ids. |
(...skipping 16 matching lines...) Expand all Loading... | |
283 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; | 300 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; |
284 restored_unackeds_server_ids_.clear(); | 301 restored_unackeds_server_ids_.clear(); |
285 | 302 |
286 // Push all unacknowledged messages to front of send queue. No need to save | 303 // Push all unacknowledged messages to front of send queue. No need to save |
287 // to RMQ, as all messages that reach this point should already have been | 304 // to RMQ, as all messages that reach this point should already have been |
288 // saved as necessary. | 305 // saved as necessary. |
289 while (!to_resend_.empty()) { | 306 while (!to_resend_.empty()) { |
290 to_send_.push_front(to_resend_.back()); | 307 to_send_.push_front(to_resend_.back()); |
291 to_resend_.pop_back(); | 308 to_resend_.pop_back(); |
292 } | 309 } |
310 | |
311 // Drop all TTL == 0 or expired TTL messages from the queue. | |
312 std::deque<MCSPacketInternal> new_to_send; | |
313 std::vector<PersistentId> dropped_ids; | |
314 while (!to_send_.empty()) { | |
315 MCSPacketInternal packet = to_send_.front(); | |
316 to_send_.pop_front(); | |
317 if (GetTTL(*packet->protobuf) > 0 && | |
318 !HasTTLExpired(*packet->protobuf, clock_)) { | |
319 new_to_send.push_back(packet); | |
320 } else { | |
321 if (!packet->persistent_id.empty()) | |
322 dropped_ids.push_back(packet->persistent_id); | |
jianli
2014/01/02 18:52:58
If TTL is 0, we do not give a persisten ID as in S
Nicolas Zea
2014/01/02 21:39:08
Right, but those ids weren't stored in the RMQStor
jianli
2014/01/02 21:51:31
But the comment at line 311 says: Drop all TTL ==
Nicolas Zea
2014/01/02 21:56:13
We are handling the case by removing those message
| |
323 message_sent_callback_.Run("TTL expired"); | |
324 } | |
325 } | |
326 | |
327 if (!dropped_ids.empty()) { | |
328 DVLOG(1) << "Connection reset, " << dropped_ids.size() | |
329 << " messages expired."; | |
330 rmq_store_->RemoveOutgoingMessages( | |
331 dropped_ids, | |
332 base::Bind(&MCSClient::OnRMQUpdateFinished, | |
333 weak_ptr_factory_.GetWeakPtr())); | |
334 } | |
335 | |
336 to_send_.swap(new_to_send); | |
337 | |
293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 338 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
294 << " incoming acks pending, and " << to_send_.size() | 339 << " incoming acks pending, and " << to_send_.size() |
295 << " pending outgoing messages."; | 340 << " pending outgoing messages."; |
296 | 341 |
297 state_ = CONNECTING; | 342 state_ = CONNECTING; |
298 } | 343 } |
299 | 344 |
300 void MCSClient::SendHeartbeat() { | 345 void MCSClient::SendHeartbeat() { |
301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 346 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
302 false); | |
303 } | 347 } |
304 | 348 |
305 void MCSClient::OnRMQUpdateFinished(bool success) { | 349 void MCSClient::OnRMQUpdateFinished(bool success) { |
306 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 350 LOG_IF(ERROR, !success) << "RMQ Update failed!"; |
307 // TODO(zea): Rebuild the store from scratch in case of persistence failure? | 351 // TODO(zea): Rebuild the store from scratch in case of persistence failure? |
308 } | 352 } |
309 | 353 |
310 void MCSClient::MaybeSendMessage() { | 354 void MCSClient::MaybeSendMessage() { |
311 if (to_send_.empty()) | 355 if (to_send_.empty()) |
312 return; | 356 return; |
313 | 357 |
314 if (!connection_handler_->CanSendMessage()) | 358 // If the connection has been reset, do nothing. On reconnection |
359 // MaybeSendMessage will be automatically invoked again. | |
360 // TODO(zea): consider doing TTL expiration at connection reset time, rather | |
361 // than reconnect time. | |
362 if (!connection_factory_->IsEndpointReachable()) | |
315 return; | 363 return; |
316 | 364 |
317 // TODO(zea): drop messages older than their TTL. | |
318 | |
319 DVLOG(1) << "Pending output message found, sending."; | |
320 MCSPacketInternal packet = to_send_.front(); | 365 MCSPacketInternal packet = to_send_.front(); |
321 to_send_.pop_front(); | 366 to_send_.pop_front(); |
367 if (HasTTLExpired(*packet->protobuf, clock_)) { | |
368 DCHECK(packet->persistent_id.empty()); | |
369 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | |
370 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); | |
371 rmq_store_->RemoveOutgoingMessage( | |
372 packet->persistent_id, | |
373 base::Bind(&MCSClient::OnRMQUpdateFinished, | |
374 weak_ptr_factory_.GetWeakPtr())); | |
375 base::MessageLoop::current()->PostTask( | |
376 FROM_HERE, | |
377 base::Bind(&MCSClient::MaybeSendMessage, | |
378 weak_ptr_factory_.GetWeakPtr())); | |
379 return; | |
380 } | |
381 DVLOG(1) << "Pending output message found, sending."; | |
322 if (!packet->persistent_id.empty()) | 382 if (!packet->persistent_id.empty()) |
323 to_resend_.push_back(packet); | 383 to_resend_.push_back(packet); |
324 SendPacketToWire(packet.get()); | 384 SendPacketToWire(packet.get()); |
325 } | 385 } |
326 | 386 |
327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 387 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
328 packet_info->stream_id = ++stream_id_out_; | 388 packet_info->stream_id = ++stream_id_out_; |
329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 389 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
330 | 390 |
331 // Set the proper last received stream id to acknowledge received server | 391 // Set the proper last received stream id to acknowledge received server |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
372 data.set_key(kIdleNotification); | 432 data.set_key(kIdleNotification); |
373 data.set_value("false"); | 433 data.set_value("false"); |
374 response->add_app_data()->CopyFrom(data); | 434 response->add_app_data()->CopyFrom(data); |
375 response->set_category(kMCSCategory); | 435 response->set_category(kMCSCategory); |
376 } | 436 } |
377 } | 437 } |
378 | 438 |
379 if (send) { | 439 if (send) { |
380 SendMessage( | 440 SendMessage( |
381 MCSMessage(kDataMessageStanzaTag, | 441 MCSMessage(kDataMessageStanzaTag, |
382 response.PassAs<const google::protobuf::MessageLite>()), | 442 response.PassAs<const google::protobuf::MessageLite>())); |
383 false); | |
384 } | 443 } |
385 } | 444 } |
386 | 445 |
387 void MCSClient::HandlePacketFromWire( | 446 void MCSClient::HandlePacketFromWire( |
388 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 447 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
389 if (!protobuf.get()) | 448 if (!protobuf.get()) |
390 return; | 449 return; |
391 uint8 tag = GetMCSProtoTag(*protobuf); | 450 uint8 tag = GetMCSProtoTag(*protobuf); |
392 PersistentId persistent_id = GetPersistentId(*protobuf); | 451 PersistentId persistent_id = GetPersistentId(*protobuf); |
393 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); | 452 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
427 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() | 486 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() |
428 << " with persistent id " | 487 << " with persistent id " |
429 << (persistent_id.empty() ? "NULL" : persistent_id) | 488 << (persistent_id.empty() ? "NULL" : persistent_id) |
430 << ", stream id " << stream_id_in_ << " and last stream id received " | 489 << ", stream id " << stream_id_in_ << " and last stream id received " |
431 << last_stream_id_received; | 490 << last_stream_id_received; |
432 | 491 |
433 if (unacked_server_ids_.size() > 0 && | 492 if (unacked_server_ids_.size() > 0 && |
434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { | 493 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
435 SendMessage(MCSMessage(kIqStanzaTag, | 494 SendMessage(MCSMessage(kIqStanzaTag, |
436 BuildStreamAck(). | 495 BuildStreamAck(). |
437 PassAs<const google::protobuf::MessageLite>()), | 496 PassAs<const google::protobuf::MessageLite>())); |
438 false); | |
439 } | 497 } |
440 | 498 |
441 // The connection is alive, treat this message as a heartbeat ack. | 499 // The connection is alive, treat this message as a heartbeat ack. |
442 heartbeat_manager_.OnHeartbeatAcked(); | 500 heartbeat_manager_.OnHeartbeatAcked(); |
443 | 501 |
444 switch (tag) { | 502 switch (tag) { |
445 case kLoginResponseTag: { | 503 case kLoginResponseTag: { |
446 DCHECK_EQ(CONNECTING, state_); | 504 DCHECK_EQ(CONNECTING, state_); |
447 mcs_proto::LoginResponse* login_response = | 505 mcs_proto::LoginResponse* login_response = |
448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); | 506 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
486 base::Bind(&MCSClient::SendHeartbeat, | 544 base::Bind(&MCSClient::SendHeartbeat, |
487 weak_ptr_factory_.GetWeakPtr()), | 545 weak_ptr_factory_.GetWeakPtr()), |
488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, | 546 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
489 weak_ptr_factory_.GetWeakPtr())); | 547 weak_ptr_factory_.GetWeakPtr())); |
490 return; | 548 return; |
491 } | 549 } |
492 case kHeartbeatPingTag: | 550 case kHeartbeatPingTag: |
493 DCHECK_GE(stream_id_in_, 1U); | 551 DCHECK_GE(stream_id_in_, 1U); |
494 DVLOG(1) << "Received heartbeat ping, sending ack."; | 552 DVLOG(1) << "Received heartbeat ping, sending ack."; |
495 SendMessage( | 553 SendMessage( |
496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); | 554 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck())); |
497 return; | 555 return; |
498 case kHeartbeatAckTag: | 556 case kHeartbeatAckTag: |
499 DCHECK_GE(stream_id_in_, 1U); | 557 DCHECK_GE(stream_id_in_, 1U); |
500 DVLOG(1) << "Received heartbeat ack."; | 558 DVLOG(1) << "Received heartbeat ack."; |
501 // Do nothing else, all messages act as heartbeat acks. | 559 // Do nothing else, all messages act as heartbeat acks. |
502 return; | 560 return; |
503 case kCloseTag: | 561 case kCloseTag: |
504 LOG(ERROR) << "Received close command, resetting connection."; | 562 LOG(ERROR) << "Received close command, resetting connection."; |
505 state_ = LOADED; | 563 state_ = LOADED; |
506 connection_factory_->SignalConnectionReset(); | 564 connection_factory_->SignalConnectionReset(); |
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
654 | 712 |
655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 713 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 714 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
657 } | 715 } |
658 | 716 |
659 void MCSClient::OnConnectionResetByHeartbeat() { | 717 void MCSClient::OnConnectionResetByHeartbeat() { |
660 connection_factory_->SignalConnectionReset(); | 718 connection_factory_->SignalConnectionReset(); |
661 } | 719 } |
662 | 720 |
663 } // namespace gcm | 721 } // namespace gcm |
OLD | NEW |