| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include <set> | 7 #include <set> |
| 8 | 8 |
| 9 #include "base/basictypes.h" | 9 #include "base/basictypes.h" |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| 11 #include "base/message_loop/message_loop.h" | 11 #include "base/location.h" |
| 12 #include "base/metrics/histogram.h" | 12 #include "base/metrics/histogram.h" |
| 13 #include "base/strings/string_number_conversions.h" | 13 #include "base/strings/string_number_conversions.h" |
| 14 #include "base/thread_task_runner_handle.h" |
| 14 #include "base/time/clock.h" | 15 #include "base/time/clock.h" |
| 15 #include "base/time/time.h" | 16 #include "base/time/time.h" |
| 16 #include "base/timer/timer.h" | 17 #include "base/timer/timer.h" |
| 17 #include "google_apis/gcm/base/mcs_util.h" | 18 #include "google_apis/gcm/base/mcs_util.h" |
| 18 #include "google_apis/gcm/base/socket_stream.h" | 19 #include "google_apis/gcm/base/socket_stream.h" |
| 19 #include "google_apis/gcm/engine/connection_factory.h" | 20 #include "google_apis/gcm/engine/connection_factory.h" |
| 20 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h" | 21 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h" |
| 21 | 22 |
| 22 using namespace google::protobuf::io; | 23 using namespace google::protobuf::io; |
| 23 | 24 |
| (...skipping 513 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 537 | 538 |
| 538 MCSPacketInternal packet = PopMessageForSend(); | 539 MCSPacketInternal packet = PopMessageForSend(); |
| 539 if (HasTTLExpired(*packet->protobuf, clock_)) { | 540 if (HasTTLExpired(*packet->protobuf, clock_)) { |
| 540 DCHECK(!packet->persistent_id.empty()); | 541 DCHECK(!packet->persistent_id.empty()); |
| 541 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 542 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| 542 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 543 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
| 543 gcm_store_->RemoveOutgoingMessage( | 544 gcm_store_->RemoveOutgoingMessage( |
| 544 packet->persistent_id, | 545 packet->persistent_id, |
| 545 base::Bind(&MCSClient::OnGCMUpdateFinished, | 546 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 546 weak_ptr_factory_.GetWeakPtr())); | 547 weak_ptr_factory_.GetWeakPtr())); |
| 547 base::MessageLoop::current()->PostTask( | 548 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 548 FROM_HERE, | 549 FROM_HERE, |
| 549 base::Bind(&MCSClient::MaybeSendMessage, | 550 base::Bind(&MCSClient::MaybeSendMessage, |
| 550 weak_ptr_factory_.GetWeakPtr())); | 551 weak_ptr_factory_.GetWeakPtr())); |
| 551 return; | 552 return; |
| 552 } | 553 } |
| 553 DVLOG(1) << "Pending output message found, sending."; | 554 DVLOG(1) << "Pending output message found, sending."; |
| 554 if (!packet->persistent_id.empty()) | 555 if (!packet->persistent_id.empty()) |
| 555 to_resend_.push_back(packet); | 556 to_resend_.push_back(packet); |
| 556 SendPacketToWire(packet.get()); | 557 SendPacketToWire(packet.get()); |
| 557 } | 558 } |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 710 if (login_response->has_heartbeat_config()) { | 711 if (login_response->has_heartbeat_config()) { |
| 711 heartbeat_manager_.UpdateHeartbeatConfig( | 712 heartbeat_manager_.UpdateHeartbeatConfig( |
| 712 login_response->heartbeat_config()); | 713 login_response->heartbeat_config()); |
| 713 } | 714 } |
| 714 | 715 |
| 715 state_ = CONNECTED; | 716 state_ = CONNECTED; |
| 716 stream_id_in_ = 1; // To account for the login response. | 717 stream_id_in_ = 1; // To account for the login response. |
| 717 DCHECK_EQ(1U, stream_id_out_); | 718 DCHECK_EQ(1U, stream_id_out_); |
| 718 | 719 |
| 719 // Pass the login response on up. | 720 // Pass the login response on up. |
| 720 base::MessageLoop::current()->PostTask( | 721 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 721 FROM_HERE, | 722 FROM_HERE, |
| 722 base::Bind(message_received_callback_, | 723 base::Bind(message_received_callback_, |
| 723 MCSMessage(tag, protobuf.Pass()))); | 724 MCSMessage(tag, protobuf.Pass()))); |
| 724 | 725 |
| 725 // If there are pending messages, attempt to send one. | 726 // If there are pending messages, attempt to send one. |
| 726 if (!to_send_.empty()) { | 727 if (!to_send_.empty()) { |
| 727 base::MessageLoop::current()->PostTask( | 728 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 728 FROM_HERE, | 729 FROM_HERE, |
| 729 base::Bind(&MCSClient::MaybeSendMessage, | 730 base::Bind(&MCSClient::MaybeSendMessage, |
| 730 weak_ptr_factory_.GetWeakPtr())); | 731 weak_ptr_factory_.GetWeakPtr())); |
| 731 } | 732 } |
| 732 | 733 |
| 733 heartbeat_manager_.Start( | 734 heartbeat_manager_.Start( |
| 734 base::Bind(&MCSClient::SendHeartbeat, | 735 base::Bind(&MCSClient::SendHeartbeat, |
| 735 weak_ptr_factory_.GetWeakPtr()), | 736 weak_ptr_factory_.GetWeakPtr()), |
| 736 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, | 737 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
| 737 weak_ptr_factory_.GetWeakPtr())); | 738 weak_ptr_factory_.GetWeakPtr())); |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 781 case kDataMessageStanzaTag: { | 782 case kDataMessageStanzaTag: { |
| 782 DCHECK_GE(stream_id_in_, 1U); | 783 DCHECK_GE(stream_id_in_, 1U); |
| 783 mcs_proto::DataMessageStanza* data_message = | 784 mcs_proto::DataMessageStanza* data_message = |
| 784 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); | 785 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); |
| 785 if (data_message->category() == kMCSCategory) { | 786 if (data_message->category() == kMCSCategory) { |
| 786 HandleMCSDataMesssage(protobuf.Pass()); | 787 HandleMCSDataMesssage(protobuf.Pass()); |
| 787 return; | 788 return; |
| 788 } | 789 } |
| 789 | 790 |
| 790 DCHECK(protobuf.get()); | 791 DCHECK(protobuf.get()); |
| 791 base::MessageLoop::current()->PostTask( | 792 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 792 FROM_HERE, | 793 FROM_HERE, |
| 793 base::Bind(message_received_callback_, | 794 base::Bind(message_received_callback_, |
| 794 MCSMessage(tag, protobuf.Pass()))); | 795 MCSMessage(tag, protobuf.Pass()))); |
| 795 return; | 796 return; |
| 796 } | 797 } |
| 797 default: | 798 default: |
| 798 LOG(ERROR) << "Received unexpected message of type " | 799 LOG(ERROR) << "Received unexpected message of type " |
| 799 << static_cast<int>(tag); | 800 << static_cast<int>(tag); |
| 800 return; | 801 return; |
| 801 } | 802 } |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 889 base::Bind(&MCSClient::OnGCMUpdateFinished, | 890 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 890 weak_ptr_factory_.GetWeakPtr())); | 891 weak_ptr_factory_.GetWeakPtr())); |
| 891 | 892 |
| 892 // Resend any remaining outgoing messages, as they were not received by the | 893 // Resend any remaining outgoing messages, as they were not received by the |
| 893 // server. | 894 // server. |
| 894 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; | 895 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; |
| 895 while (!to_resend_.empty()) { | 896 while (!to_resend_.empty()) { |
| 896 to_send_.push_front(to_resend_.back()); | 897 to_send_.push_front(to_resend_.back()); |
| 897 to_resend_.pop_back(); | 898 to_resend_.pop_back(); |
| 898 } | 899 } |
| 899 base::MessageLoop::current()->PostTask( | 900 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 900 FROM_HERE, | 901 FROM_HERE, |
| 901 base::Bind(&MCSClient::MaybeSendMessage, | 902 base::Bind(&MCSClient::MaybeSendMessage, |
| 902 weak_ptr_factory_.GetWeakPtr())); | 903 weak_ptr_factory_.GetWeakPtr())); |
| 903 } | 904 } |
| 904 | 905 |
| 905 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) { | 906 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) { |
| 906 PersistentIdList acked_incoming_ids; | 907 PersistentIdList acked_incoming_ids; |
| 907 for (std::map<StreamId, PersistentIdList>::iterator iter = | 908 for (std::map<StreamId, PersistentIdList>::iterator iter = |
| 908 acked_server_ids_.begin(); | 909 acked_server_ids_.begin(); |
| 909 iter != acked_server_ids_.end() && | 910 iter != acked_server_ids_.end() && |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 962 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); | 963 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
| 963 CollapseKey collapse_key(*data_message); | 964 CollapseKey collapse_key(*data_message); |
| 964 if (collapse_key.IsValid()) | 965 if (collapse_key.IsValid()) |
| 965 collapse_key_map_.erase(collapse_key); | 966 collapse_key_map_.erase(collapse_key); |
| 966 } | 967 } |
| 967 | 968 |
| 968 return packet; | 969 return packet; |
| 969 } | 970 } |
| 970 | 971 |
| 971 } // namespace gcm | 972 } // namespace gcm |
| OLD | NEW |