OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include <set> | 7 #include <set> |
8 | 8 |
9 #include "base/basictypes.h" | 9 #include "base/basictypes.h" |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/message_loop/message_loop.h" | 11 #include "base/location.h" |
12 #include "base/metrics/histogram.h" | 12 #include "base/metrics/histogram.h" |
13 #include "base/strings/string_number_conversions.h" | 13 #include "base/strings/string_number_conversions.h" |
| 14 #include "base/thread_task_runner_handle.h" |
14 #include "base/time/clock.h" | 15 #include "base/time/clock.h" |
15 #include "base/time/time.h" | 16 #include "base/time/time.h" |
16 #include "base/timer/timer.h" | 17 #include "base/timer/timer.h" |
17 #include "google_apis/gcm/base/mcs_util.h" | 18 #include "google_apis/gcm/base/mcs_util.h" |
18 #include "google_apis/gcm/base/socket_stream.h" | 19 #include "google_apis/gcm/base/socket_stream.h" |
19 #include "google_apis/gcm/engine/connection_factory.h" | 20 #include "google_apis/gcm/engine/connection_factory.h" |
20 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h" | 21 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h" |
21 | 22 |
22 using namespace google::protobuf::io; | 23 using namespace google::protobuf::io; |
23 | 24 |
(...skipping 513 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
537 | 538 |
538 MCSPacketInternal packet = PopMessageForSend(); | 539 MCSPacketInternal packet = PopMessageForSend(); |
539 if (HasTTLExpired(*packet->protobuf, clock_)) { | 540 if (HasTTLExpired(*packet->protobuf, clock_)) { |
540 DCHECK(!packet->persistent_id.empty()); | 541 DCHECK(!packet->persistent_id.empty()); |
541 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 542 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
542 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 543 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
543 gcm_store_->RemoveOutgoingMessage( | 544 gcm_store_->RemoveOutgoingMessage( |
544 packet->persistent_id, | 545 packet->persistent_id, |
545 base::Bind(&MCSClient::OnGCMUpdateFinished, | 546 base::Bind(&MCSClient::OnGCMUpdateFinished, |
546 weak_ptr_factory_.GetWeakPtr())); | 547 weak_ptr_factory_.GetWeakPtr())); |
547 base::MessageLoop::current()->PostTask( | 548 base::ThreadTaskRunnerHandle::Get()->PostTask( |
548 FROM_HERE, | 549 FROM_HERE, |
549 base::Bind(&MCSClient::MaybeSendMessage, | 550 base::Bind(&MCSClient::MaybeSendMessage, |
550 weak_ptr_factory_.GetWeakPtr())); | 551 weak_ptr_factory_.GetWeakPtr())); |
551 return; | 552 return; |
552 } | 553 } |
553 DVLOG(1) << "Pending output message found, sending."; | 554 DVLOG(1) << "Pending output message found, sending."; |
554 if (!packet->persistent_id.empty()) | 555 if (!packet->persistent_id.empty()) |
555 to_resend_.push_back(packet); | 556 to_resend_.push_back(packet); |
556 SendPacketToWire(packet.get()); | 557 SendPacketToWire(packet.get()); |
557 } | 558 } |
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
710 if (login_response->has_heartbeat_config()) { | 711 if (login_response->has_heartbeat_config()) { |
711 heartbeat_manager_.UpdateHeartbeatConfig( | 712 heartbeat_manager_.UpdateHeartbeatConfig( |
712 login_response->heartbeat_config()); | 713 login_response->heartbeat_config()); |
713 } | 714 } |
714 | 715 |
715 state_ = CONNECTED; | 716 state_ = CONNECTED; |
716 stream_id_in_ = 1; // To account for the login response. | 717 stream_id_in_ = 1; // To account for the login response. |
717 DCHECK_EQ(1U, stream_id_out_); | 718 DCHECK_EQ(1U, stream_id_out_); |
718 | 719 |
719 // Pass the login response on up. | 720 // Pass the login response on up. |
720 base::MessageLoop::current()->PostTask( | 721 base::ThreadTaskRunnerHandle::Get()->PostTask( |
721 FROM_HERE, | 722 FROM_HERE, |
722 base::Bind(message_received_callback_, | 723 base::Bind(message_received_callback_, |
723 MCSMessage(tag, protobuf.Pass()))); | 724 MCSMessage(tag, protobuf.Pass()))); |
724 | 725 |
725 // If there are pending messages, attempt to send one. | 726 // If there are pending messages, attempt to send one. |
726 if (!to_send_.empty()) { | 727 if (!to_send_.empty()) { |
727 base::MessageLoop::current()->PostTask( | 728 base::ThreadTaskRunnerHandle::Get()->PostTask( |
728 FROM_HERE, | 729 FROM_HERE, |
729 base::Bind(&MCSClient::MaybeSendMessage, | 730 base::Bind(&MCSClient::MaybeSendMessage, |
730 weak_ptr_factory_.GetWeakPtr())); | 731 weak_ptr_factory_.GetWeakPtr())); |
731 } | 732 } |
732 | 733 |
733 heartbeat_manager_.Start( | 734 heartbeat_manager_.Start( |
734 base::Bind(&MCSClient::SendHeartbeat, | 735 base::Bind(&MCSClient::SendHeartbeat, |
735 weak_ptr_factory_.GetWeakPtr()), | 736 weak_ptr_factory_.GetWeakPtr()), |
736 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, | 737 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
737 weak_ptr_factory_.GetWeakPtr())); | 738 weak_ptr_factory_.GetWeakPtr())); |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
781 case kDataMessageStanzaTag: { | 782 case kDataMessageStanzaTag: { |
782 DCHECK_GE(stream_id_in_, 1U); | 783 DCHECK_GE(stream_id_in_, 1U); |
783 mcs_proto::DataMessageStanza* data_message = | 784 mcs_proto::DataMessageStanza* data_message = |
784 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); | 785 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); |
785 if (data_message->category() == kMCSCategory) { | 786 if (data_message->category() == kMCSCategory) { |
786 HandleMCSDataMesssage(protobuf.Pass()); | 787 HandleMCSDataMesssage(protobuf.Pass()); |
787 return; | 788 return; |
788 } | 789 } |
789 | 790 |
790 DCHECK(protobuf.get()); | 791 DCHECK(protobuf.get()); |
791 base::MessageLoop::current()->PostTask( | 792 base::ThreadTaskRunnerHandle::Get()->PostTask( |
792 FROM_HERE, | 793 FROM_HERE, |
793 base::Bind(message_received_callback_, | 794 base::Bind(message_received_callback_, |
794 MCSMessage(tag, protobuf.Pass()))); | 795 MCSMessage(tag, protobuf.Pass()))); |
795 return; | 796 return; |
796 } | 797 } |
797 default: | 798 default: |
798 LOG(ERROR) << "Received unexpected message of type " | 799 LOG(ERROR) << "Received unexpected message of type " |
799 << static_cast<int>(tag); | 800 << static_cast<int>(tag); |
800 return; | 801 return; |
801 } | 802 } |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
889 base::Bind(&MCSClient::OnGCMUpdateFinished, | 890 base::Bind(&MCSClient::OnGCMUpdateFinished, |
890 weak_ptr_factory_.GetWeakPtr())); | 891 weak_ptr_factory_.GetWeakPtr())); |
891 | 892 |
892 // Resend any remaining outgoing messages, as they were not received by the | 893 // Resend any remaining outgoing messages, as they were not received by the |
893 // server. | 894 // server. |
894 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; | 895 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; |
895 while (!to_resend_.empty()) { | 896 while (!to_resend_.empty()) { |
896 to_send_.push_front(to_resend_.back()); | 897 to_send_.push_front(to_resend_.back()); |
897 to_resend_.pop_back(); | 898 to_resend_.pop_back(); |
898 } | 899 } |
899 base::MessageLoop::current()->PostTask( | 900 base::ThreadTaskRunnerHandle::Get()->PostTask( |
900 FROM_HERE, | 901 FROM_HERE, |
901 base::Bind(&MCSClient::MaybeSendMessage, | 902 base::Bind(&MCSClient::MaybeSendMessage, |
902 weak_ptr_factory_.GetWeakPtr())); | 903 weak_ptr_factory_.GetWeakPtr())); |
903 } | 904 } |
904 | 905 |
905 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) { | 906 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) { |
906 PersistentIdList acked_incoming_ids; | 907 PersistentIdList acked_incoming_ids; |
907 for (std::map<StreamId, PersistentIdList>::iterator iter = | 908 for (std::map<StreamId, PersistentIdList>::iterator iter = |
908 acked_server_ids_.begin(); | 909 acked_server_ids_.begin(); |
909 iter != acked_server_ids_.end() && | 910 iter != acked_server_ids_.end() && |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
962 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); | 963 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
963 CollapseKey collapse_key(*data_message); | 964 CollapseKey collapse_key(*data_message); |
964 if (collapse_key.IsValid()) | 965 if (collapse_key.IsValid()) |
965 collapse_key_map_.erase(collapse_key); | 966 collapse_key_map_.erase(collapse_key); |
966 } | 967 } |
967 | 968 |
968 return packet; | 969 return packet; |
969 } | 970 } |
970 | 971 |
971 } // namespace gcm | 972 } // namespace gcm |
OLD | NEW |