OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
10 #include "base/strings/string_number_conversions.h" | 10 #include "base/strings/string_number_conversions.h" |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
102 weak_ptr_factory_(this) { | 102 weak_ptr_factory_(this) { |
103 } | 103 } |
104 | 104 |
105 MCSClient::~MCSClient() { | 105 MCSClient::~MCSClient() { |
106 } | 106 } |
107 | 107 |
108 void MCSClient::Initialize( | 108 void MCSClient::Initialize( |
109 const ErrorCallback& error_callback, | 109 const ErrorCallback& error_callback, |
110 const OnMessageReceivedCallback& message_received_callback, | 110 const OnMessageReceivedCallback& message_received_callback, |
111 const OnMessageSentCallback& message_sent_callback, | 111 const OnMessageSentCallback& message_sent_callback, |
112 const GCMStore::LoadResult& load_result) { | 112 scoped_ptr<GCMStore::LoadResult> load_result) { |
113 DCHECK_EQ(state_, UNINITIALIZED); | 113 DCHECK_EQ(state_, UNINITIALIZED); |
114 | 114 |
115 state_ = LOADED; | 115 state_ = LOADED; |
116 mcs_error_callback_ = error_callback; | 116 mcs_error_callback_ = error_callback; |
117 message_received_callback_ = message_received_callback; | 117 message_received_callback_ = message_received_callback; |
118 message_sent_callback_ = message_sent_callback; | 118 message_sent_callback_ = message_sent_callback; |
119 | 119 |
120 connection_factory_->Initialize( | 120 connection_factory_->Initialize( |
121 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, | 121 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, |
122 weak_ptr_factory_.GetWeakPtr()), | 122 weak_ptr_factory_.GetWeakPtr()), |
123 base::Bind(&MCSClient::HandlePacketFromWire, | 123 base::Bind(&MCSClient::HandlePacketFromWire, |
124 weak_ptr_factory_.GetWeakPtr()), | 124 weak_ptr_factory_.GetWeakPtr()), |
125 base::Bind(&MCSClient::MaybeSendMessage, | 125 base::Bind(&MCSClient::MaybeSendMessage, |
126 weak_ptr_factory_.GetWeakPtr())); | 126 weak_ptr_factory_.GetWeakPtr())); |
127 connection_handler_ = connection_factory_->GetConnectionHandler(); | 127 connection_handler_ = connection_factory_->GetConnectionHandler(); |
128 | 128 |
129 stream_id_out_ = 1; // Login request is hardcoded to id 1. | 129 stream_id_out_ = 1; // Login request is hardcoded to id 1. |
130 | 130 |
131 android_id_ = load_result.device_android_id; | 131 android_id_ = load_result->device_android_id; |
132 security_token_ = load_result.device_security_token; | 132 security_token_ = load_result->device_security_token; |
133 | 133 |
134 if (android_id_ == 0) { | 134 if (android_id_ == 0) { |
135 DVLOG(1) << "No device credentials found, assuming new client."; | 135 DVLOG(1) << "No device credentials found, assuming new client."; |
136 // No need to try and load RMQ data in that case. | 136 // No need to try and load RMQ data in that case. |
137 return; | 137 return; |
138 } | 138 } |
139 | 139 |
140 // |android_id_| is non-zero, so should |security_token_|. | 140 // |android_id_| is non-zero, so should |security_token_|. |
141 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id" | 141 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id" |
142 << " is non-zero."; | 142 << " is non-zero."; |
143 | 143 |
144 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() | 144 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size() |
145 << " incoming acks pending and " | 145 << " incoming acks pending and " |
146 << load_result.outgoing_messages.size() | 146 << load_result->outgoing_messages.size() |
147 << " outgoing messages pending."; | 147 << " outgoing messages pending."; |
148 | 148 |
149 restored_unackeds_server_ids_ = load_result.incoming_messages; | 149 restored_unackeds_server_ids_ = load_result->incoming_messages; |
150 | 150 |
151 // First go through and order the outgoing messages by recency. | 151 // First go through and order the outgoing messages by recency. |
152 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; | 152 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; |
153 std::vector<PersistentId> expired_ttl_ids; | 153 std::vector<PersistentId> expired_ttl_ids; |
154 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator | 154 for (GCMStore::OutgoingMessageMap::iterator iter = |
155 iter = load_result.outgoing_messages.begin(); | 155 load_result->outgoing_messages.begin(); |
156 iter != load_result.outgoing_messages.end(); ++iter) { | 156 iter != load_result->outgoing_messages.end(); ++iter) { |
157 uint64 timestamp = 0; | 157 uint64 timestamp = 0; |
158 if (!base::StringToUint64(iter->first, ×tamp)) { | 158 if (!base::StringToUint64(iter->first, ×tamp)) { |
159 LOG(ERROR) << "Invalid restored message."; | 159 LOG(ERROR) << "Invalid restored message."; |
160 // TODO(fgorski): Error: data unreadable | 160 // TODO(fgorski): Error: data unreadable |
161 mcs_error_callback_.Run(); | 161 mcs_error_callback_.Run(); |
162 return; | 162 return; |
163 } | 163 } |
164 | 164 |
165 // Check if the TTL has expired for this message. | 165 // Check if the TTL has expired for this message. |
166 if (HasTTLExpired(*iter->second, clock_)) { | 166 if (HasTTLExpired(*iter->second, clock_)) { |
167 expired_ttl_ids.push_back(iter->first); | 167 expired_ttl_ids.push_back(iter->first); |
168 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED); | 168 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED); |
169 delete iter->second; | |
170 continue; | 169 continue; |
171 } | 170 } |
172 | 171 |
173 ordered_messages[timestamp] = iter->second; | 172 ordered_messages[timestamp] = iter->second.release(); |
174 } | 173 } |
175 | 174 |
176 if (!expired_ttl_ids.empty()) { | 175 if (!expired_ttl_ids.empty()) { |
177 gcm_store_->RemoveOutgoingMessages( | 176 gcm_store_->RemoveOutgoingMessages( |
178 expired_ttl_ids, | 177 expired_ttl_ids, |
179 base::Bind(&MCSClient::OnGCMUpdateFinished, | 178 base::Bind(&MCSClient::OnGCMUpdateFinished, |
180 weak_ptr_factory_.GetWeakPtr())); | 179 weak_ptr_factory_.GetWeakPtr())); |
181 } | 180 } |
182 | 181 |
183 // Now go through and add the outgoing messages to the send queue in their | 182 // Now go through and add the outgoing messages to the send queue in their |
184 // appropriate order (oldest at front, most recent at back). | 183 // appropriate order (oldest at front, most recent at back). |
185 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator | 184 for (std::map<uint64, google::protobuf::MessageLite*>::iterator |
186 iter = ordered_messages.begin(); | 185 iter = ordered_messages.begin(); |
187 iter != ordered_messages.end(); ++iter) { | 186 iter != ordered_messages.end(); ++iter) { |
188 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 187 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
189 packet_info->protobuf.reset(iter->second); | 188 packet_info->protobuf.reset(iter->second); |
190 packet_info->tag = GetMCSProtoTag(*iter->second); | 189 packet_info->tag = GetMCSProtoTag(*iter->second); |
191 packet_info->persistent_id = base::Uint64ToString(iter->first); | 190 packet_info->persistent_id = base::Uint64ToString(iter->first); |
192 to_send_.push_back(make_linked_ptr(packet_info)); | 191 to_send_.push_back(make_linked_ptr(packet_info)); |
193 } | 192 } |
194 } | 193 } |
195 | 194 |
(...skipping 19 matching lines...) Expand all Loading... |
215 DCHECK_GE(ttl, 0); | 214 DCHECK_GE(ttl, 0); |
216 if (to_send_.size() > kMaxSendQueueSize) { | 215 if (to_send_.size() > kMaxSendQueueSize) { |
217 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED); | 216 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED); |
218 return; | 217 return; |
219 } | 218 } |
220 if (message.size() > kMaxMessageBytes) { | 219 if (message.size() > kMaxMessageBytes) { |
221 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); | 220 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); |
222 return; | 221 return; |
223 } | 222 } |
224 | 223 |
225 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 224 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); |
226 packet_info->tag = message.tag(); | 225 packet_info->tag = message.tag(); |
227 packet_info->protobuf = message.CloneProtobuf(); | 226 packet_info->protobuf = message.CloneProtobuf(); |
228 | 227 |
229 if (ttl > 0) { | 228 if (ttl > 0) { |
230 PersistentId persistent_id = GetNextPersistentId(); | 229 PersistentId persistent_id = GetNextPersistentId(); |
231 DVLOG(1) << "Setting persistent id to " << persistent_id; | 230 DVLOG(1) << "Setting persistent id to " << persistent_id; |
232 packet_info->persistent_id = persistent_id; | 231 packet_info->persistent_id = persistent_id; |
233 SetPersistentId(persistent_id, | 232 SetPersistentId(persistent_id, |
234 packet_info->protobuf.get()); | 233 packet_info->protobuf.get()); |
235 if (!gcm_store_->AddOutgoingMessage( | 234 if (!gcm_store_->AddOutgoingMessage( |
236 persistent_id, | 235 persistent_id, |
237 MCSMessage(message.tag(), | 236 MCSMessage(message.tag(), |
238 *(packet_info->protobuf)), | 237 *(packet_info->protobuf)), |
239 base::Bind(&MCSClient::OnGCMUpdateFinished, | 238 base::Bind(&MCSClient::OnGCMUpdateFinished, |
240 weak_ptr_factory_.GetWeakPtr()))) { | 239 weak_ptr_factory_.GetWeakPtr()))) { |
241 NotifyMessageSendStatus(message.GetProtobuf(), | 240 NotifyMessageSendStatus(message.GetProtobuf(), |
242 APP_QUEUE_SIZE_LIMIT_REACHED); | 241 APP_QUEUE_SIZE_LIMIT_REACHED); |
243 return; | 242 return; |
244 } | 243 } |
245 } else if (!connection_factory_->IsEndpointReachable()) { | 244 } else if (!connection_factory_->IsEndpointReachable()) { |
246 DVLOG(1) << "No active connection, dropping message."; | 245 DVLOG(1) << "No active connection, dropping message."; |
247 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); | 246 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
248 return; | 247 return; |
249 } | 248 } |
250 to_send_.push_back(make_linked_ptr(packet_info)); | 249 to_send_.push_back(make_linked_ptr(packet_info.release())); |
251 MaybeSendMessage(); | 250 MaybeSendMessage(); |
252 } | 251 } |
253 | 252 |
254 void MCSClient::Destroy() { | 253 void MCSClient::Destroy() { |
255 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished, | 254 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished, |
256 weak_ptr_factory_.GetWeakPtr())); | 255 weak_ptr_factory_.GetWeakPtr())); |
257 } | 256 } |
258 | 257 |
259 void MCSClient::ResetStateAndBuildLoginRequest( | 258 void MCSClient::ResetStateAndBuildLoginRequest( |
260 mcs_proto::LoginRequest* request) { | 259 mcs_proto::LoginRequest* request) { |
(...skipping 478 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
739 const mcs_proto::DataMessageStanza* data_message_stanza = | 738 const mcs_proto::DataMessageStanza* data_message_stanza = |
740 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); | 739 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); |
741 message_sent_callback_.Run( | 740 message_sent_callback_.Run( |
742 data_message_stanza->device_user_id(), | 741 data_message_stanza->device_user_id(), |
743 data_message_stanza->category(), | 742 data_message_stanza->category(), |
744 data_message_stanza->id(), | 743 data_message_stanza->id(), |
745 status); | 744 status); |
746 } | 745 } |
747 | 746 |
748 } // namespace gcm | 747 } // namespace gcm |
OLD | NEW |