| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | |
| 9 #include <set> | 8 #include <set> |
| 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/location.h" | 12 #include "base/location.h" |
| 13 #include "base/metrics/histogram.h" | 13 #include "base/metrics/histogram.h" |
| 14 #include "base/strings/string_number_conversions.h" | 14 #include "base/strings/string_number_conversions.h" |
| 15 #include "base/thread_task_runner_handle.h" | 15 #include "base/thread_task_runner_handle.h" |
| 16 #include "base/time/clock.h" | 16 #include "base/time/clock.h" |
| 17 #include "base/time/time.h" | 17 #include "base/time/time.h" |
| 18 #include "base/timer/timer.h" | 18 #include "base/timer/timer.h" |
| 19 #include "google_apis/gcm/base/mcs_util.h" | 19 #include "google_apis/gcm/base/mcs_util.h" |
| (...skipping 307 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 327 // First check if this message should replace a pending message with the | 327 // First check if this message should replace a pending message with the |
| 328 // same collapse key. | 328 // same collapse key. |
| 329 mcs_proto::DataMessageStanza* data_message = | 329 mcs_proto::DataMessageStanza* data_message = |
| 330 reinterpret_cast<mcs_proto::DataMessageStanza*>( | 330 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 331 packet_info->protobuf.get()); | 331 packet_info->protobuf.get()); |
| 332 CollapseKey collapse_key(*data_message); | 332 CollapseKey collapse_key(*data_message); |
| 333 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { | 333 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
| 334 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; | 334 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
| 335 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " | 335 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
| 336 << original_packet->persistent_id; | 336 << original_packet->persistent_id; |
| 337 original_packet->protobuf = packet_info->protobuf.Pass(); | 337 original_packet->protobuf = std::move(packet_info->protobuf); |
| 338 SetPersistentId(original_packet->persistent_id, | 338 SetPersistentId(original_packet->persistent_id, |
| 339 original_packet->protobuf.get()); | 339 original_packet->protobuf.get()); |
| 340 gcm_store_->OverwriteOutgoingMessage( | 340 gcm_store_->OverwriteOutgoingMessage( |
| 341 original_packet->persistent_id, | 341 original_packet->persistent_id, |
| 342 message, | 342 message, |
| 343 base::Bind(&MCSClient::OnGCMUpdateFinished, | 343 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 344 weak_ptr_factory_.GetWeakPtr())); | 344 weak_ptr_factory_.GetWeakPtr())); |
| 345 | 345 |
| 346 // The message is already queued, return. | 346 // The message is already queued, return. |
| 347 return; | 347 return; |
| (...skipping 24 matching lines...) Expand all Loading... |
| 372 to_send_.push_back(make_linked_ptr(packet_info.release())); | 372 to_send_.push_back(make_linked_ptr(packet_info.release())); |
| 373 | 373 |
| 374 // Notify that the messages has been succsfully queued for sending. | 374 // Notify that the messages has been succsfully queued for sending. |
| 375 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. | 375 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. |
| 376 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED); | 376 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED); |
| 377 | 377 |
| 378 MaybeSendMessage(); | 378 MaybeSendMessage(); |
| 379 } | 379 } |
| 380 | 380 |
| 381 void MCSClient::UpdateHeartbeatTimer(scoped_ptr<base::Timer> timer) { | 381 void MCSClient::UpdateHeartbeatTimer(scoped_ptr<base::Timer> timer) { |
| 382 heartbeat_manager_.UpdateHeartbeatTimer(timer.Pass()); | 382 heartbeat_manager_.UpdateHeartbeatTimer(std::move(timer)); |
| 383 } | 383 } |
| 384 | 384 |
| 385 void MCSClient::AddHeartbeatInterval(const std::string& scope, | 385 void MCSClient::AddHeartbeatInterval(const std::string& scope, |
| 386 int interval_ms) { | 386 int interval_ms) { |
| 387 if (!heartbeat_manager_.IsValidClientHeartbeatInterval(interval_ms)) | 387 if (!heartbeat_manager_.IsValidClientHeartbeatInterval(interval_ms)) |
| 388 return; | 388 return; |
| 389 | 389 |
| 390 custom_heartbeat_intervals_[scope] = interval_ms; | 390 custom_heartbeat_intervals_[scope] = interval_ms; |
| 391 gcm_store_->AddHeartbeatInterval(scope, interval_ms, | 391 gcm_store_->AddHeartbeatInterval(scope, interval_ms, |
| 392 base::Bind(&MCSClient::OnGCMUpdateFinished, | 392 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| (...skipping 232 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 625 send = true; | 625 send = true; |
| 626 mcs_proto::AppData data; | 626 mcs_proto::AppData data; |
| 627 data.set_key(kIdleNotification); | 627 data.set_key(kIdleNotification); |
| 628 data.set_value("false"); | 628 data.set_value("false"); |
| 629 response->add_app_data()->CopyFrom(data); | 629 response->add_app_data()->CopyFrom(data); |
| 630 response->set_category(kMCSCategory); | 630 response->set_category(kMCSCategory); |
| 631 } | 631 } |
| 632 } | 632 } |
| 633 | 633 |
| 634 if (send) { | 634 if (send) { |
| 635 SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass())); | 635 SendMessage(MCSMessage(kDataMessageStanzaTag, std::move(response))); |
| 636 } | 636 } |
| 637 } | 637 } |
| 638 | 638 |
| 639 void MCSClient::HandlePacketFromWire( | 639 void MCSClient::HandlePacketFromWire( |
| 640 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 640 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
| 641 if (!protobuf.get()) | 641 if (!protobuf.get()) |
| 642 return; | 642 return; |
| 643 uint8_t tag = GetMCSProtoTag(*protobuf); | 643 uint8_t tag = GetMCSProtoTag(*protobuf); |
| 644 PersistentId persistent_id = GetPersistentId(*protobuf); | 644 PersistentId persistent_id = GetPersistentId(*protobuf); |
| 645 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); | 645 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 714 heartbeat_manager_.UpdateHeartbeatConfig( | 714 heartbeat_manager_.UpdateHeartbeatConfig( |
| 715 login_response->heartbeat_config()); | 715 login_response->heartbeat_config()); |
| 716 } | 716 } |
| 717 | 717 |
| 718 state_ = CONNECTED; | 718 state_ = CONNECTED; |
| 719 stream_id_in_ = 1; // To account for the login response. | 719 stream_id_in_ = 1; // To account for the login response. |
| 720 DCHECK_EQ(1U, stream_id_out_); | 720 DCHECK_EQ(1U, stream_id_out_); |
| 721 | 721 |
| 722 // Pass the login response on up. | 722 // Pass the login response on up. |
| 723 base::ThreadTaskRunnerHandle::Get()->PostTask( | 723 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 724 FROM_HERE, | 724 FROM_HERE, base::Bind(message_received_callback_, |
| 725 base::Bind(message_received_callback_, | 725 MCSMessage(tag, std::move(protobuf)))); |
| 726 MCSMessage(tag, protobuf.Pass()))); | |
| 727 | 726 |
| 728 // If there are pending messages, attempt to send one. | 727 // If there are pending messages, attempt to send one. |
| 729 if (!to_send_.empty()) { | 728 if (!to_send_.empty()) { |
| 730 base::ThreadTaskRunnerHandle::Get()->PostTask( | 729 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 731 FROM_HERE, | 730 FROM_HERE, |
| 732 base::Bind(&MCSClient::MaybeSendMessage, | 731 base::Bind(&MCSClient::MaybeSendMessage, |
| 733 weak_ptr_factory_.GetWeakPtr())); | 732 weak_ptr_factory_.GetWeakPtr())); |
| 734 } | 733 } |
| 735 | 734 |
| 736 heartbeat_manager_.Start( | 735 heartbeat_manager_.Start( |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 779 LOG(WARNING) << "Received invalid iq stanza extension " | 778 LOG(WARNING) << "Received invalid iq stanza extension " |
| 780 << iq_extension.id(); | 779 << iq_extension.id(); |
| 781 return; | 780 return; |
| 782 } | 781 } |
| 783 } | 782 } |
| 784 case kDataMessageStanzaTag: { | 783 case kDataMessageStanzaTag: { |
| 785 DCHECK_GE(stream_id_in_, 1U); | 784 DCHECK_GE(stream_id_in_, 1U); |
| 786 mcs_proto::DataMessageStanza* data_message = | 785 mcs_proto::DataMessageStanza* data_message = |
| 787 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); | 786 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); |
| 788 if (data_message->category() == kMCSCategory) { | 787 if (data_message->category() == kMCSCategory) { |
| 789 HandleMCSDataMesssage(protobuf.Pass()); | 788 HandleMCSDataMesssage(std::move(protobuf)); |
| 790 return; | 789 return; |
| 791 } | 790 } |
| 792 | 791 |
| 793 DCHECK(protobuf.get()); | 792 DCHECK(protobuf.get()); |
| 794 base::ThreadTaskRunnerHandle::Get()->PostTask( | 793 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 795 FROM_HERE, | 794 FROM_HERE, base::Bind(message_received_callback_, |
| 796 base::Bind(message_received_callback_, | 795 MCSMessage(tag, std::move(protobuf)))); |
| 797 MCSMessage(tag, protobuf.Pass()))); | |
| 798 return; | 796 return; |
| 799 } | 797 } |
| 800 default: | 798 default: |
| 801 LOG(ERROR) << "Received unexpected message of type " | 799 LOG(ERROR) << "Received unexpected message of type " |
| 802 << static_cast<int>(tag); | 800 << static_cast<int>(tag); |
| 803 return; | 801 return; |
| 804 } | 802 } |
| 805 } | 803 } |
| 806 | 804 |
| 807 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) { | 805 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) { |
| (...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 965 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); | 963 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
| 966 CollapseKey collapse_key(*data_message); | 964 CollapseKey collapse_key(*data_message); |
| 967 if (collapse_key.IsValid()) | 965 if (collapse_key.IsValid()) |
| 968 collapse_key_map_.erase(collapse_key); | 966 collapse_key_map_.erase(collapse_key); |
| 969 } | 967 } |
| 970 | 968 |
| 971 return packet; | 969 return packet; |
| 972 } | 970 } |
| 973 | 971 |
| 974 } // namespace gcm | 972 } // namespace gcm |
| OLD | NEW |