| Index: third_party/cacheinvalidation/src/google/cacheinvalidation/impl/invalidation-client-core.cc
|
| diff --git a/third_party/cacheinvalidation/src/google/cacheinvalidation/impl/invalidation-client-core.cc b/third_party/cacheinvalidation/src/google/cacheinvalidation/impl/invalidation-client-core.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..f8e637dd16d1e9a6cc39da891d19272b2c4bdb91
|
| --- /dev/null
|
| +++ b/third_party/cacheinvalidation/src/google/cacheinvalidation/impl/invalidation-client-core.cc
|
| @@ -0,0 +1,1009 @@
|
| +// Copyright 2012 Google Inc.
|
| +//
|
| +// Licensed under the Apache License, Version 2.0 (the "License");
|
| +// you may not use this file except in compliance with the License.
|
| +// You may obtain a copy of the License at
|
| +//
|
| +// http://www.apache.org/licenses/LICENSE-2.0
|
| +//
|
| +// Unless required by applicable law or agreed to in writing, software
|
| +// distributed under the License is distributed on an "AS IS" BASIS,
|
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +// See the License for the specific language governing permissions and
|
| +// limitations under the License.
|
| +
|
| +// Implementation of the Invalidation Client Library (Ticl).
|
| +
|
| +#include "google/cacheinvalidation/impl/invalidation-client-core.h"
|
| +
|
| +#include <sstream>
|
| +
|
| +#include "google/cacheinvalidation/client_test_internal.pb.h"
|
| +#include "google/cacheinvalidation/deps/callback.h"
|
| +#include "google/cacheinvalidation/deps/random.h"
|
| +#include "google/cacheinvalidation/deps/sha1-digest-function.h"
|
| +#include "google/cacheinvalidation/deps/string_util.h"
|
| +#include "google/cacheinvalidation/impl/exponential-backoff-delay-generator.h"
|
| +#include "google/cacheinvalidation/impl/invalidation-client-util.h"
|
| +#include "google/cacheinvalidation/impl/log-macro.h"
|
| +#include "google/cacheinvalidation/impl/persistence-utils.h"
|
| +#include "google/cacheinvalidation/impl/proto-converter.h"
|
| +#include "google/cacheinvalidation/impl/proto-helpers.h"
|
| +#include "google/cacheinvalidation/impl/recurring-task.h"
|
| +#include "google/cacheinvalidation/impl/smearer.h"
|
| +
|
| +namespace invalidation {
|
| +
|
| +using ::ipc::invalidation::RegistrationManagerStateP;
|
| +
|
| +const char* InvalidationClientCore::kClientTokenKey = "ClientToken";
|
| +
|
| +// AcquireTokenTask
|
| +
|
| +AcquireTokenTask::AcquireTokenTask(InvalidationClientCore* client)
|
| + : RecurringTask(
|
| + "AcquireToken",
|
| + client->internal_scheduler_,
|
| + client->logger_,
|
| + &client->smearer_,
|
| + client->CreateExpBackOffGenerator(TimeDelta::FromMilliseconds(
|
| + client->config_.network_timeout_delay_ms())),
|
| + Scheduler::NoDelay(),
|
| + TimeDelta::FromMilliseconds(
|
| + client->config_.network_timeout_delay_ms())),
|
| + client_(client) {
|
| + }
|
| +
|
| +bool AcquireTokenTask::RunTask() {
|
| + // If token is still not assigned (as expected), sends a request.
|
| + // Otherwise, ignore.
|
| + if (client_->client_token_.empty()) {
|
| + // Allocate a nonce and send a message requesting a new token.
|
| + client_->set_nonce(
|
| + InvalidationClientCore::GenerateNonce(client_->random_.get()));
|
| +
|
| + client_->protocol_handler_.SendInitializeMessage(
|
| + client_->application_client_id_, client_->nonce_,
|
| + client_->batching_task_.get(),
|
| + "AcquireToken");
|
| + // Reschedule to check state, retry if necessary after timeout.
|
| + return true;
|
| + } else {
|
| + return false; // Don't reschedule.
|
| + }
|
| +}
|
| +
|
| +// RegSyncHeartbeatTask
|
| +
|
| +RegSyncHeartbeatTask::RegSyncHeartbeatTask(InvalidationClientCore* client)
|
| + : RecurringTask(
|
| + "RegSyncHeartbeat",
|
| + client->internal_scheduler_,
|
| + client->logger_,
|
| + &client->smearer_,
|
| + client->CreateExpBackOffGenerator(TimeDelta::FromMilliseconds(
|
| + client->config_.network_timeout_delay_ms())),
|
| + TimeDelta::FromMilliseconds(
|
| + client->config_.network_timeout_delay_ms()),
|
| + TimeDelta::FromMilliseconds(
|
| + client->config_.network_timeout_delay_ms())),
|
| + client_(client) {
|
| +}
|
| +
|
| +bool RegSyncHeartbeatTask::RunTask() {
|
| + if (!client_->registration_manager_.IsStateInSyncWithServer()) {
|
| + // Simply send an info message to ensure syncing happens.
|
| + TLOG(client_->logger_, INFO, "Registration state not in sync with "
|
| + "server: %s", client_->registration_manager_.ToString().c_str());
|
| + client_->SendInfoMessageToServer(false, true /* request server summary */);
|
| + return true;
|
| + } else {
|
| + TLOG(client_->logger_, INFO, "Not sending message since state is in sync");
|
| + return false;
|
| + }
|
| +}
|
| +
|
| +// PersistentWriteTask
|
| +
|
| +PersistentWriteTask::PersistentWriteTask(InvalidationClientCore* client)
|
| + : RecurringTask(
|
| + "PersistentWrite",
|
| + client->internal_scheduler_,
|
| + client->logger_,
|
| + &client->smearer_,
|
| + client->CreateExpBackOffGenerator(TimeDelta::FromMilliseconds(
|
| + client->config_.write_retry_delay_ms())),
|
| + Scheduler::NoDelay(),
|
| + TimeDelta::FromMilliseconds(
|
| + client->config_.write_retry_delay_ms())),
|
| + client_(client) {
|
| +}
|
| +
|
| +bool PersistentWriteTask::RunTask() {
|
| + if (client_->client_token_.empty() ||
|
| + (client_->client_token_ == last_written_token_)) {
|
| + // No work to be done
|
| + return false; // Do not reschedule
|
| + }
|
| +
|
| + // Persistent write needs to happen.
|
| + PersistentTiclState state;
|
| + state.set_client_token(client_->client_token_);
|
| + string serialized_state;
|
| + PersistenceUtils::SerializeState(state, client_->digest_fn_.get(),
|
| + &serialized_state);
|
| + client_->storage_->WriteKey(InvalidationClientCore::kClientTokenKey,
|
| + serialized_state,
|
| + NewPermanentCallback(this, &PersistentWriteTask::WriteCallback,
|
| + client_->client_token_));
|
| + return true; // Reschedule after timeout to make sure that write does happen.
|
| +}
|
| +
|
| +void PersistentWriteTask::WriteCallback(const string& token, Status status) {
|
| + TLOG(client_->logger_, INFO, "Write state completed: %d, %s",
|
| + status.IsSuccess(), status.message().c_str());
|
| + if (status.IsSuccess()) {
|
| + // Set lastWrittenToken to be the token that was written (NOT client_token_:
|
| + // which could have changed while the write was happening).
|
| + last_written_token_ = token;
|
| + } else {
|
| + client_->statistics_->RecordError(
|
| + Statistics::ClientErrorType_PERSISTENT_WRITE_FAILURE);
|
| + }
|
| +}
|
| +
|
| +// HeartbeatTask
|
| +
|
| +HeartbeatTask::HeartbeatTask(InvalidationClientCore* client)
|
| + : RecurringTask(
|
| + "Heartbeat",
|
| + client->internal_scheduler_,
|
| + client->logger_,
|
| + &client->smearer_,
|
| + NULL,
|
| + TimeDelta::FromMilliseconds(
|
| + client->config_.heartbeat_interval_ms()),
|
| + Scheduler::NoDelay()),
|
| + client_(client) {
|
| + next_performance_send_time_ = client_->internal_scheduler_->GetCurrentTime() +
|
| + smearer()->GetSmearedDelay(TimeDelta::FromMilliseconds(
|
| + client_->config_.perf_counter_delay_ms()));
|
| +}
|
| +
|
| +bool HeartbeatTask::RunTask() {
|
| + // Send info message. If needed, send performance counters and reset the next
|
| + // performance counter send time.
|
| + TLOG(client_->logger_, INFO, "Sending heartbeat to server: %s",
|
| + client_->ToString().c_str());
|
| + Scheduler *scheduler = client_->internal_scheduler_;
|
| + bool must_send_perf_counters =
|
| + next_performance_send_time_ > scheduler->GetCurrentTime();
|
| + if (must_send_perf_counters) {
|
| + next_performance_send_time_ = scheduler->GetCurrentTime() +
|
| + client_->smearer_.GetSmearedDelay(TimeDelta::FromMilliseconds(
|
| + client_->config_.perf_counter_delay_ms()));
|
| + }
|
| +
|
| + TLOG(client_->logger_, INFO, "Sending heartbeat to server: %s",
|
| + client_->ToString().c_str());
|
| + client_->SendInfoMessageToServer(must_send_perf_counters,
|
| + !client_->registration_manager_.IsStateInSyncWithServer());
|
| + return true; // Reschedule.
|
| +}
|
| +
|
| +BatchingTask::BatchingTask(
|
| + ProtocolHandler *handler, Smearer* smearer, TimeDelta batching_delay)
|
| + : RecurringTask(
|
| + "Batching", handler->internal_scheduler_, handler->logger_, smearer,
|
| + NULL, batching_delay, Scheduler::NoDelay()),
|
| + protocol_handler_(handler) {
|
| +}
|
| +
|
| +bool BatchingTask::RunTask() {
|
| + // Send message to server - the batching information is picked up in
|
| + // SendMessageToServer.
|
| + protocol_handler_->SendMessageToServer();
|
| + return false; // Don't reschedule.
|
| +}
|
| +
|
| +InvalidationClientCore::InvalidationClientCore(
|
| + SystemResources* resources, Random* random, int client_type,
|
| + const string& client_name, const ClientConfigP& config,
|
| + const string& application_name)
|
| + : resources_(resources),
|
| + internal_scheduler_(resources->internal_scheduler()),
|
| + logger_(resources->logger()),
|
| + storage_(new SafeStorage(resources->storage())),
|
| + statistics_(new Statistics()),
|
| + config_(config),
|
| + digest_fn_(new Sha1DigestFunction()),
|
| + registration_manager_(logger_, statistics_.get(), digest_fn_.get()),
|
| + msg_validator_(new TiclMessageValidator(logger_)),
|
| + smearer_(random, config.smear_percent()),
|
| + protocol_handler_(config.protocol_handler_config(), resources, &smearer_,
|
| + statistics_.get(), client_type, application_name, this,
|
| + msg_validator_.get()),
|
| + is_online_(true),
|
| + random_(random) {
|
| + storage_.get()->SetSystemResources(resources_);
|
| + application_client_id_.set_client_name(client_name);
|
| + application_client_id_.set_client_type(client_type);
|
| + CreateSchedulingTasks();
|
| + RegisterWithNetwork(resources);
|
| + TLOG(logger_, INFO, "Created client: %s", ToString().c_str());
|
| +}
|
| +
|
| +void InvalidationClientCore::RegisterWithNetwork(SystemResources* resources) {
|
| + // Install ourselves as a receiver for server messages.
|
| + resources->network()->SetMessageReceiver(
|
| + NewPermanentCallback(this, &InvalidationClientCore::MessageReceiver));
|
| +
|
| + resources->network()->AddNetworkStatusReceiver(
|
| + NewPermanentCallback(this,
|
| + &InvalidationClientCore::NetworkStatusReceiver));
|
| +}
|
| +
|
| +void InvalidationClientCore::CreateSchedulingTasks() {
|
| + acquire_token_task_.reset(new AcquireTokenTask(this));
|
| + reg_sync_heartbeat_task_.reset(new RegSyncHeartbeatTask(this));
|
| + persistent_write_task_.reset(new PersistentWriteTask(this));
|
| + heartbeat_task_.reset(new HeartbeatTask(this));
|
| + batching_task_.reset(new BatchingTask(&protocol_handler_,
|
| + &smearer_,
|
| + TimeDelta::FromMilliseconds(
|
| + config_.protocol_handler_config().batching_delay_ms())));
|
| +}
|
| +
|
| +void InvalidationClientCore::InitConfig(ClientConfigP* config) {
|
| + ProtoHelpers::InitConfigVersion(config->mutable_version());
|
| + ProtocolHandler::InitConfig(config->mutable_protocol_handler_config());
|
| +}
|
| +
|
| +void InvalidationClientCore::InitConfigForTest(ClientConfigP* config) {
|
| + ProtoHelpers::InitConfigVersion(config->mutable_version());
|
| + config->set_network_timeout_delay_ms(2000);
|
| + config->set_heartbeat_interval_ms(5000);
|
| + config->set_write_retry_delay_ms(500);
|
| + ProtocolHandler::InitConfigForTest(config->mutable_protocol_handler_config());
|
| +}
|
| +
|
| +void InvalidationClientCore::Start() {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + if (ticl_state_.IsStarted()) {
|
| + TLOG(logger_, SEVERE,
|
| + "Ignoring start call since already started: client = %s",
|
| + this->ToString().c_str());
|
| + return;
|
| + }
|
| +
|
| + // Initialize the nonce so that we can maintain the invariant that exactly
|
| + // one of "nonce_" and "client_token_" is non-empty.
|
| + set_nonce(InvalidationClientCore::GenerateNonce(random_.get()));
|
| +
|
| + TLOG(logger_, INFO, "Starting with C++ config: %s",
|
| + ProtoHelpers::ToString(config_).c_str());
|
| +
|
| + // Read the state blob and then schedule startInternal once the value is
|
| + // there.
|
| + ScheduleStartAfterReadingStateBlob();
|
| +}
|
| +
|
| +void InvalidationClientCore::StartInternal(const string& serialized_state) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + CHECK(resources_->IsStarted()) << "Resources must be started before starting "
|
| + "the Ticl";
|
| +
|
| + // Initialize the session manager using the persisted client token.
|
| + PersistentTiclState persistent_state;
|
| + bool deserialized = false;
|
| + if (!serialized_state.empty()) {
|
| + deserialized = PersistenceUtils::DeserializeState(
|
| + logger_, serialized_state, digest_fn_.get(), &persistent_state);
|
| + }
|
| +
|
| + if (!serialized_state.empty() && !deserialized) {
|
| + // In this case, we'll proceed as if we had no persistent state -- i.e.,
|
| + // obtain a new client id from the server.
|
| + statistics_->RecordError(
|
| + Statistics::ClientErrorType_PERSISTENT_DESERIALIZATION_FAILURE);
|
| + TLOG(logger_, SEVERE, "Failed deserializing persistent state: %s",
|
| + ProtoHelpers::ToString(serialized_state).c_str());
|
| + }
|
| + if (deserialized) {
|
| + // If we have persistent state, use the previously-stored token and send a
|
| + // heartbeat to let the server know that we've restarted, since we may have
|
| + // been marked offline.
|
| + //
|
| + // In the common case, the server will already have all of our
|
| + // registrations, but we won't know for sure until we've gotten its summary.
|
| + // We'll ask the application for all of its registrations, but to avoid
|
| + // making the registrar redo the work of performing registrations that
|
| + // probably already exist, we'll suppress sending them to the registrar.
|
| + TLOG(logger_, INFO, "Restarting from persistent state: %s",
|
| + ProtoHelpers::ToString(
|
| + persistent_state.client_token()).c_str());
|
| + set_nonce("");
|
| + set_client_token(persistent_state.client_token());
|
| + should_send_registrations_ = false;
|
| +
|
| + // Schedule an info message for the near future. We delay a little bit to
|
| + // allow the application to reissue its registrations locally and avoid
|
| + // triggering registration sync with the data center due to a hash mismatch.
|
| + internal_scheduler_->Schedule(TimeDelta::FromMilliseconds(
|
| + config_.initial_persistent_heartbeat_delay_ms()),
|
| + NewPermanentCallback(this,
|
| + &InvalidationClientCore::SendInfoMessageToServer, false, true));
|
| +
|
| + // We need to ensure that heartbeats are sent, regardless of whether we
|
| + // start fresh or from persistent state. The line below ensures that they
|
| + // are scheduled in the persistent startup case. For the other case, the
|
| + // task is scheduled when we acquire a token.
|
| + heartbeat_task_.get()->EnsureScheduled("Startup-after-persistence");
|
| + } else {
|
| + // If we had no persistent state or couldn't deserialize the state that we
|
| + // had, start fresh. Request a new client identifier.
|
| + //
|
| + // The server can't possibly have our registrations, so whatever we get
|
| + // from the application we should send to the registrar.
|
| + TLOG(logger_, INFO, "Starting with no previous state");
|
| + should_send_registrations_ = true;
|
| + ScheduleAcquireToken("Startup");
|
| + }
|
| + // InvalidationListener.Ready() is called when the ticl has acquired a
|
| + // new token.
|
| +}
|
| +
|
| +void InvalidationClientCore::Stop() {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + TLOG(logger_, WARNING, "Ticl being stopped: %s", ToString().c_str());
|
| + if (ticl_state_.IsStarted()) {
|
| + ticl_state_.Stop();
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::Register(const ObjectId& object_id) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + vector<ObjectId> object_ids;
|
| + object_ids.push_back(object_id);
|
| + PerformRegisterOperations(object_ids, RegistrationP_OpType_REGISTER);
|
| +}
|
| +
|
| +void InvalidationClientCore::Unregister(const ObjectId& object_id) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + vector<ObjectId> object_ids;
|
| + object_ids.push_back(object_id);
|
| + PerformRegisterOperations(object_ids, RegistrationP_OpType_UNREGISTER);
|
| +}
|
| +
|
| +void InvalidationClientCore::PerformRegisterOperations(
|
| + const vector<ObjectId>& object_ids, RegistrationP::OpType reg_op_type) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + CHECK(!object_ids.empty()) << "Must specify some object id";
|
| +
|
| + if (ticl_state_.IsStopped()) {
|
| + // The Ticl has been stopped. This might be some old registration op
|
| + // coming in. Just ignore instead of crashing.
|
| + TLOG(logger_, SEVERE, "Ticl stopped: register (%d) of %d objects ignored.",
|
| + reg_op_type, object_ids.size());
|
| + return;
|
| + }
|
| + if (!ticl_state_.IsStarted()) {
|
| + // We must be in the NOT_STARTED state, since we can't be in STOPPED or
|
| + // STARTED (since the previous if-check didn't succeeded, and isStarted uses
|
| + // a != STARTED test).
|
| + TLOG(logger_, SEVERE,
|
| + "Ticl is not yet started; failing registration call; client = %s, "
|
| + "num-objects = %d, op = %d",
|
| + this->ToString().c_str(), object_ids.size(), reg_op_type);
|
| + for (size_t i = 0; i < object_ids.size(); ++i) {
|
| + const ObjectId& object_id = object_ids[i];
|
| + GetListener()->InformRegistrationFailure(this, object_id, true,
|
| + "Client not yet ready");
|
| + }
|
| + return;
|
| + }
|
| +
|
| + vector<ObjectIdP> object_id_protos;
|
| + for (size_t i = 0; i < object_ids.size(); ++i) {
|
| + const ObjectId& object_id = object_ids[i];
|
| + ObjectIdP object_id_proto;
|
| + ProtoConverter::ConvertToObjectIdProto(object_id, &object_id_proto);
|
| + Statistics::IncomingOperationType op_type =
|
| + (reg_op_type == RegistrationP_OpType_REGISTER) ?
|
| + Statistics::IncomingOperationType_REGISTRATION :
|
| + Statistics::IncomingOperationType_UNREGISTRATION;
|
| + statistics_->RecordIncomingOperation(op_type);
|
| + TLOG(logger_, INFO, "Register %s, %d",
|
| + ProtoHelpers::ToString(object_id_proto).c_str(), reg_op_type);
|
| + object_id_protos.push_back(object_id_proto);
|
| + }
|
| +
|
| +
|
| + // Update the registration manager state, then have the protocol client send a
|
| + // message.
|
| + vector<ObjectIdP> object_id_protos_to_send;
|
| + registration_manager_.PerformOperations(object_id_protos, reg_op_type,
|
| + &object_id_protos_to_send);
|
| +
|
| + // Check whether we should suppress sending registrations because we don't
|
| + // yet know the server's summary.
|
| + if (should_send_registrations_ && (!object_id_protos_to_send.empty())) {
|
| + protocol_handler_.SendRegistrations(
|
| + object_id_protos_to_send, reg_op_type, batching_task_.get());
|
| + }
|
| + reg_sync_heartbeat_task_.get()->EnsureScheduled("PerformRegister");
|
| +}
|
| +
|
| +void InvalidationClientCore::Acknowledge(const AckHandle& acknowledge_handle) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + if (acknowledge_handle.IsNoOp()) {
|
| + // Nothing to do. We do not increment statistics here since this is a no op
|
| + // handle and statistics can only be acccessed on the scheduler thread.
|
| + return;
|
| + }
|
| + // Validate the ack handle.
|
| +
|
| + // 1. Parse the ack handle first.
|
| + AckHandleP ack_handle;
|
| + ack_handle.ParseFromString(acknowledge_handle.handle_data());
|
| + if (!ack_handle.IsInitialized()) {
|
| + TLOG(logger_, WARNING, "Bad ack handle : %s",
|
| + ProtoHelpers::ToString(acknowledge_handle.handle_data()).c_str());
|
| + statistics_->RecordError(
|
| + Statistics::ClientErrorType_ACKNOWLEDGE_HANDLE_FAILURE);
|
| + return;
|
| + }
|
| +
|
| + // 2. Validate ack handle - it should have a valid invalidation.
|
| + if (!ack_handle.has_invalidation()
|
| + || !msg_validator_->IsValid(ack_handle.invalidation())) {
|
| + TLOG(logger_, WARNING, "Incorrect ack handle: %s",
|
| + ProtoHelpers::ToString(ack_handle).c_str());
|
| + statistics_->RecordError(
|
| + Statistics::ClientErrorType_ACKNOWLEDGE_HANDLE_FAILURE);
|
| + return;
|
| + }
|
| +
|
| + // Currently, only invalidations have non-trivial ack handle.
|
| + InvalidationP* invalidation = ack_handle.mutable_invalidation();
|
| + invalidation->clear_payload(); // Don't send the payload back.
|
| + statistics_->RecordIncomingOperation(
|
| + Statistics::IncomingOperationType_ACKNOWLEDGE);
|
| + protocol_handler_.SendInvalidationAck(*invalidation, batching_task_.get());
|
| +}
|
| +
|
| +string InvalidationClientCore::ToString() {
|
| + return StringPrintf("Client: %s, %s, %s",
|
| + ProtoHelpers::ToString(application_client_id_).c_str(),
|
| + ProtoHelpers::ToString(client_token_).c_str(),
|
| + this->ticl_state_.ToString().c_str());
|
| +}
|
| +
|
| +string InvalidationClientCore::GetClientToken() {
|
| + CHECK(client_token_.empty() || nonce_.empty());
|
| + TLOG(logger_, FINE, "Return client token = %s",
|
| + ProtoHelpers::ToString(client_token_).c_str());
|
| + return client_token_;
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleIncomingMessage(const string& message) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_TOTAL);
|
| + ParsedMessage parsed_message;
|
| + if (!protocol_handler_.HandleIncomingMessage(message, &parsed_message)) {
|
| + // Invalid message.
|
| + return;
|
| + }
|
| +
|
| + // Ensure we have either a matching token or a matching nonce.
|
| + if (!ValidateToken(parsed_message.header.token())) {
|
| + return;
|
| + }
|
| +
|
| + // Handle a token control message, if present.
|
| + if (parsed_message.token_control_message != NULL) {
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_TOKEN_CONTROL);
|
| + HandleTokenChanged(parsed_message.header.token(),
|
| + parsed_message.token_control_message->new_token());
|
| + }
|
| +
|
| + // We might have lost our token or failed to acquire one. Ensure that we do
|
| + // not proceed in either case.
|
| + // Note that checking for the presence of a TokenControlMessage is *not*
|
| + // sufficient: it might be a token-assign with the wrong nonce or a
|
| + // token-destroy message, for example.
|
| + if (client_token_.empty()) {
|
| + return;
|
| + }
|
| +
|
| + // Handle the messages received from the server by calling the appropriate
|
| + // listener method.
|
| +
|
| + // In the beginning inform the listener about the header (the caller is
|
| + // already prepared to handle the fact that the same header is given to
|
| + // it multiple times).
|
| + HandleIncomingHeader(parsed_message.header);
|
| +
|
| + if (parsed_message.invalidation_message != NULL) {
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_INVALIDATION);
|
| + HandleInvalidations(parsed_message.invalidation_message->invalidation());
|
| + }
|
| + if (parsed_message.registration_status_message != NULL) {
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_REGISTRATION_STATUS);
|
| + HandleRegistrationStatus(
|
| + parsed_message.registration_status_message->registration_status());
|
| + }
|
| + if (parsed_message.registration_sync_request_message != NULL) {
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_REGISTRATION_SYNC_REQUEST);
|
| + HandleRegistrationSyncRequest();
|
| + }
|
| + if (parsed_message.info_request_message != NULL) {
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_INFO_REQUEST);
|
| + HandleInfoMessage(
|
| + // Shouldn't have to do this, but the proto compiler generates bad code
|
| + // for repeated enum fields.
|
| + *reinterpret_cast<const RepeatedField<InfoRequestMessage_InfoType>* >(
|
| + &parsed_message.info_request_message->info_type()));
|
| + }
|
| + if (parsed_message.error_message != NULL) {
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_ERROR);
|
| + HandleErrorMessage(
|
| + parsed_message.error_message->code(),
|
| + parsed_message.error_message->description());
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleTokenChanged(
|
| + const string& header_token, const string& new_token) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + // The server is either supplying a new token in response to an
|
| + // InitializeMessage, spontaneously destroying a token we hold, or
|
| + // spontaneously upgrading a token we hold.
|
| +
|
| + if (!new_token.empty()) {
|
| + // Note: header_token cannot be empty, so an empty nonce or client_token will
|
| + // always be non-equal.
|
| + bool header_token_matches_nonce = header_token == nonce_;
|
| + bool header_token_matches_existing_token = header_token == client_token_;
|
| + bool should_accept_token =
|
| + header_token_matches_nonce || header_token_matches_existing_token;
|
| + if (!should_accept_token) {
|
| + TLOG(logger_, INFO, "Ignoring new token; %s does not match nonce = %s "
|
| + "or existing token = %s",
|
| + ProtoHelpers::ToString(new_token).c_str(),
|
| + ProtoHelpers::ToString(nonce_).c_str(),
|
| + ProtoHelpers::ToString(client_token_).c_str());
|
| + return;
|
| + }
|
| + TLOG(logger_, INFO, "New token being assigned at client: %s, Old = %s",
|
| + ProtoHelpers::ToString(new_token).c_str(),
|
| + ProtoHelpers::ToString(client_token_).c_str());
|
| + heartbeat_task_.get()->EnsureScheduled("Heartbeat-after-new-token");
|
| + set_nonce("");
|
| + set_client_token(new_token);
|
| + persistent_write_task_.get()->EnsureScheduled("Write-after-new-token");
|
| + } else {
|
| + // Destroy the existing token.
|
| + TLOG(logger_, INFO, "Destroying existing token: %s",
|
| + ProtoHelpers::ToString(client_token_).c_str());
|
| + ScheduleAcquireToken("Destroy");
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::ScheduleAcquireToken(const string& debug_string) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + set_client_token("");
|
| + acquire_token_task_.get()->EnsureScheduled(debug_string);
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleInvalidations(
|
| + const RepeatedPtrField<InvalidationP>& invalidations) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + for (int i = 0; i < invalidations.size(); ++i) {
|
| + const InvalidationP& invalidation = invalidations.Get(i);
|
| + AckHandleP ack_handle_proto;
|
| + ack_handle_proto.mutable_invalidation()->CopyFrom(invalidation);
|
| + string serialized;
|
| + ack_handle_proto.SerializeToString(&serialized);
|
| + AckHandle ack_handle(serialized);
|
| + if (ProtoConverter::IsAllObjectIdP(invalidation.object_id())) {
|
| + TLOG(logger_, INFO, "Issuing invalidate all");
|
| + GetListener()->InvalidateAll(this, ack_handle);
|
| + } else {
|
| + // Regular object. Could be unknown version or not.
|
| + Invalidation inv;
|
| + ProtoConverter::ConvertFromInvalidationProto(invalidation, &inv);
|
| + bool isSuppressed = invalidation.is_trickle_restart();
|
| + TLOG(logger_, INFO, "Issuing invalidate: %s",
|
| + ProtoHelpers::ToString(invalidation).c_str());
|
| +
|
| + // Issue invalidate if the invalidation had a known version AND either
|
| + // no suppression has occurred or the client allows suppression.
|
| + if (invalidation.is_known_version() &&
|
| + (!isSuppressed || config_.allow_suppression())) {
|
| + GetListener()->Invalidate(this, inv, ack_handle);
|
| + } else {
|
| + // Unknown version
|
| + GetListener()->InvalidateUnknownVersion(this,
|
| + inv.object_id(), ack_handle);
|
| + }
|
| + }
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleRegistrationStatus(
|
| + const RepeatedPtrField<RegistrationStatus>& reg_status_list) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + vector<bool> local_processing_statuses;
|
| + registration_manager_.HandleRegistrationStatus(
|
| + reg_status_list, &local_processing_statuses);
|
| + CHECK(local_processing_statuses.size() ==
|
| + static_cast<size_t>(reg_status_list.size())) <<
|
| + "Not all registration statuses were processed";
|
| +
|
| + // Inform app about the success or failure of each registration based
|
| + // on what the registration manager has indicated.
|
| + for (int i = 0; i < reg_status_list.size(); ++i) {
|
| + const RegistrationStatus& reg_status = reg_status_list.Get(i);
|
| + bool was_success = local_processing_statuses[i];
|
| + TLOG(logger_, FINE, "Process reg status: %s",
|
| + ProtoHelpers::ToString(reg_status).c_str());
|
| +
|
| + ObjectId object_id;
|
| + ProtoConverter::ConvertFromObjectIdProto(
|
| + reg_status.registration().object_id(), &object_id);
|
| + if (was_success) {
|
| + // Server operation was both successful and agreed with what the client
|
| + // wanted.
|
| + RegistrationP::OpType reg_op_type = reg_status.registration().op_type();
|
| + InvalidationListener::RegistrationState reg_state =
|
| + ConvertOpTypeToRegState(reg_op_type);
|
| + GetListener()->InformRegistrationStatus(this, object_id, reg_state);
|
| + } else {
|
| + // Server operation either failed or disagreed with client's intent (e.g.,
|
| + // successful unregister, but the client wanted a registration).
|
| + string description =
|
| + (reg_status.status().code() == StatusP_Code_SUCCESS) ?
|
| + "Registration discrepancy detected" :
|
| + reg_status.status().description();
|
| + bool is_permanent =
|
| + (reg_status.status().code() == StatusP_Code_PERMANENT_FAILURE);
|
| + GetListener()->InformRegistrationFailure(
|
| + this, object_id, !is_permanent, description);
|
| + }
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleRegistrationSyncRequest() {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + // Send all the registrations in the reg sync message.
|
| + // Generate a single subtree for all the registrations.
|
| + RegistrationSubtree subtree;
|
| + registration_manager_.GetRegistrations("", 0, &subtree);
|
| + protocol_handler_.SendRegistrationSyncSubtree(subtree, batching_task_.get());
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleInfoMessage(
|
| + const RepeatedField<InfoRequestMessage_InfoType>& info_types) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + bool must_send_performance_counters = false;
|
| + for (int i = 0; i < info_types.size(); ++i) {
|
| + must_send_performance_counters =
|
| + (info_types.Get(i) ==
|
| + InfoRequestMessage_InfoType_GET_PERFORMANCE_COUNTERS);
|
| + if (must_send_performance_counters) {
|
| + break;
|
| + }
|
| + }
|
| + SendInfoMessageToServer(must_send_performance_counters,
|
| + !registration_manager_.IsStateInSyncWithServer());
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleErrorMessage(
|
| + ErrorMessage::Code code,
|
| + const string& description) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + // If it is an auth failure, we shut down the ticl.
|
| + TLOG(logger_, SEVERE, "Received error message: %s, %s",
|
| + ProtoHelpers::ToString(code).c_str(),
|
| + description.c_str());
|
| +
|
| + // Translate the code to error reason.
|
| + int reason;
|
| + switch (code) {
|
| + case ErrorMessage_Code_AUTH_FAILURE:
|
| + reason = ErrorReason::AUTH_FAILURE;
|
| + break;
|
| + case ErrorMessage_Code_UNKNOWN_FAILURE:
|
| + reason = ErrorReason::UNKNOWN_FAILURE;
|
| + break;
|
| + default:
|
| + reason = ErrorReason::UNKNOWN_FAILURE;
|
| + break;
|
| + }
|
| + // Issue an informError to the application.
|
| + ErrorInfo error_info(reason, false, description, ErrorContext());
|
| + GetListener()->InformError(this, error_info);
|
| +
|
| + // If this is an auth failure, remove registrations and stop the Ticl.
|
| + // Otherwise do nothing.
|
| + if (code != ErrorMessage_Code_AUTH_FAILURE) {
|
| + return;
|
| + }
|
| +
|
| + // If there are any registrations, remove them and issue registration
|
| + // failure.
|
| + vector<ObjectIdP> desired_registrations;
|
| + registration_manager_.RemoveRegisteredObjects(&desired_registrations);
|
| + TLOG(logger_, WARNING, "Issuing failure for %d objects",
|
| + desired_registrations.size());
|
| + for (size_t i = 0; i < desired_registrations.size(); ++i) {
|
| + ObjectId object_id;
|
| + ProtoConverter::ConvertFromObjectIdProto(
|
| + desired_registrations[i], &object_id);
|
| + GetListener()->InformRegistrationFailure(
|
| + this, object_id, false, "Auth error");
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleMessageSent() {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + last_message_send_time_ = internal_scheduler_->GetCurrentTime();
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleNetworkStatusChange(bool is_online) {
|
| + // If we're back online and haven't sent a message to the server in a while,
|
| + // send a heartbeat to make sure the server knows we're online.
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + bool was_online = is_online_;
|
| + is_online_ = is_online;
|
| + if (is_online && !was_online &&
|
| + (internal_scheduler_->GetCurrentTime() >
|
| + last_message_send_time_ + TimeDelta::FromMilliseconds(
|
| + config_.offline_heartbeat_threshold_ms()))) {
|
| + TLOG(logger_, INFO,
|
| + "Sending heartbeat after reconnection; previous send was %s ms ago",
|
| + SimpleItoa(
|
| + (internal_scheduler_->GetCurrentTime() - last_message_send_time_)
|
| + .InMilliseconds()).c_str());
|
| + SendInfoMessageToServer(
|
| + false, !registration_manager_.IsStateInSyncWithServer());
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::GetRegistrationManagerStateAsSerializedProto(
|
| + string* result) {
|
| + RegistrationManagerStateP reg_state;
|
| + registration_manager_.GetClientSummary(reg_state.mutable_client_summary());
|
| + registration_manager_.GetServerSummary(reg_state.mutable_server_summary());
|
| + vector<ObjectIdP> registered_objects;
|
| + registration_manager_.GetRegisteredObjectsForTest(®istered_objects);
|
| + for (size_t i = 0; i < registered_objects.size(); ++i) {
|
| + reg_state.add_registered_objects()->CopyFrom(registered_objects[i]);
|
| + }
|
| + reg_state.SerializeToString(result);
|
| +}
|
| +
|
| +void InvalidationClientCore::GetStatisticsAsSerializedProto(
|
| + string* result) {
|
| + vector<pair<string, int> > properties;
|
| + statistics_->GetNonZeroStatistics(&properties);
|
| + InfoMessage info_message;
|
| + for (size_t i = 0; i < properties.size(); ++i) {
|
| + PropertyRecord* record = info_message.add_performance_counter();
|
| + record->set_name(properties[i].first);
|
| + record->set_value(properties[i].second);
|
| + }
|
| + info_message.SerializeToString(result);
|
| +}
|
| +
|
| +void InvalidationClientCore::HandleIncomingHeader(
|
| + const ServerMessageHeader& header) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + CHECK(nonce_.empty()) <<
|
| + "Cannot process server header " << header.ToString() <<
|
| + " with non-empty nonce " << nonce_;
|
| +
|
| + if (header.registration_summary() != NULL) {
|
| + // We've received a summary from the server, so if we were suppressing
|
| + // registrations, we should now allow them to go to the registrar.
|
| + should_send_registrations_ = true;
|
| +
|
| +
|
| + // Pass the registration summary to the registration manager. If we are now
|
| + // in agreement with the server and we had any pending operations, we can
|
| + // tell the listener that those operations have succeeded.
|
| + vector<RegistrationP> upcalls;
|
| + registration_manager_.InformServerRegistrationSummary(
|
| + *header.registration_summary(), &upcalls);
|
| + TLOG(logger_, FINE,
|
| + "Receivced new server registration summary (%s); will make %d upcalls",
|
| + ProtoHelpers::ToString(*header.registration_summary()).c_str(),
|
| + upcalls.size());
|
| + vector<RegistrationP>::iterator iter;
|
| + for (iter = upcalls.begin(); iter != upcalls.end(); iter++) {
|
| + const RegistrationP& registration = *iter;
|
| + ObjectId object_id;
|
| + ProtoConverter::ConvertFromObjectIdProto(registration.object_id(),
|
| + &object_id);
|
| + InvalidationListener::RegistrationState reg_state =
|
| + ConvertOpTypeToRegState(registration.op_type());
|
| + GetListener()->InformRegistrationStatus(this, object_id, reg_state);
|
| + }
|
| + }
|
| +}
|
| +
|
| +bool InvalidationClientCore::ValidateToken(const string& server_token) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + if (!client_token_.empty()) {
|
| + // Client token case.
|
| + if (client_token_ != server_token) {
|
| + TLOG(logger_, INFO, "Incoming message has bad token: %s, %s",
|
| + ProtoHelpers::ToString(client_token_).c_str(),
|
| + ProtoHelpers::ToString(server_token).c_str());
|
| + statistics_->RecordError(Statistics::ClientErrorType_TOKEN_MISMATCH);
|
| + return false;
|
| + }
|
| + return true;
|
| + } else if (!nonce_.empty()) {
|
| + // Nonce case.
|
| + CHECK(!nonce_.empty()) << "Client token and nonce are both empty: "
|
| + << client_token_ << ", " << nonce_;
|
| + if (nonce_ != server_token) {
|
| + statistics_->RecordError(Statistics::ClientErrorType_NONCE_MISMATCH);
|
| + TLOG(logger_, INFO,
|
| + "Rejecting server message with mismatched nonce: Client = %s, "
|
| + "Server = %s", ProtoHelpers::ToString(nonce_).c_str(),
|
| + ProtoHelpers::ToString(server_token).c_str());
|
| + return false;
|
| + } else {
|
| + TLOG(logger_, INFO,
|
| + "Accepting server message with matching nonce: %s",
|
| + ProtoHelpers::ToString(nonce_).c_str());
|
| + return true;
|
| + }
|
| + }
|
| + // Neither token nor nonce; ignore message.
|
| + return false;
|
| +}
|
| +
|
| +void InvalidationClientCore::SendInfoMessageToServer(
|
| + bool must_send_performance_counters, bool request_server_summary) {
|
| + TLOG(logger_, INFO,
|
| + "Sending info message to server; request server summary = %s",
|
| + request_server_summary ? "true" : "false");
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + // Make sure that you have the latest registration summary.
|
| + vector<pair<string, int> > performance_counters;
|
| + ClientConfigP* config_to_send = NULL;
|
| + if (must_send_performance_counters) {
|
| + statistics_->GetNonZeroStatistics(&performance_counters);
|
| + config_to_send = &config_;
|
| + }
|
| + protocol_handler_.SendInfoMessage(performance_counters, config_to_send,
|
| + request_server_summary, batching_task_.get());
|
| +}
|
| +
|
| +string InvalidationClientCore::GenerateNonce(Random* random) {
|
| + // Return a nonce computed by converting a random 64-bit number to a string.
|
| + return SimpleItoa(static_cast<int64>(random->RandUint64()));
|
| +}
|
| +
|
| +void InvalidationClientCore::set_nonce(const string& new_nonce) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + CHECK(new_nonce.empty() || client_token_.empty()) <<
|
| + "Tried to set nonce with existing token " << client_token_;
|
| + nonce_ = new_nonce;
|
| +}
|
| +
|
| +void InvalidationClientCore::set_client_token(const string& new_client_token) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + CHECK(new_client_token.empty() || nonce_.empty()) <<
|
| + "Tried to set token with existing nonce " << nonce_;
|
| +
|
| + // If the ticl has not been started and we are getting a new token (either
|
| + // from persistence or from the server, start the ticl and inform the
|
| + // application.
|
| + bool finish_starting_ticl = !ticl_state_.IsStarted() &&
|
| + client_token_.empty() && !new_client_token.empty();
|
| + client_token_ = new_client_token;
|
| +
|
| + if (finish_starting_ticl) {
|
| + FinishStartingTiclAndInformListener();
|
| + }
|
| +}
|
| +
|
| +void InvalidationClientCore::FinishStartingTiclAndInformListener() {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + CHECK(!ticl_state_.IsStarted());
|
| +
|
| + ticl_state_.Start();
|
| + GetListener()->Ready(this);
|
| +
|
| + // We are not currently persisting our registration digest, so regardless of
|
| + // whether or not we are restarting from persistent state, we need to query
|
| + // the application for all of its registrations.
|
| + GetListener()->ReissueRegistrations(this,
|
| + RegistrationManager::kEmptyPrefix, 0);
|
| + TLOG(logger_, INFO, "Ticl started: %s", ToString().c_str());
|
| +}
|
| +
|
| +void InvalidationClientCore::ScheduleStartAfterReadingStateBlob() {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + storage_->ReadKey(kClientTokenKey,
|
| + NewPermanentCallback(this, &InvalidationClientCore::ReadCallback));
|
| +}
|
| +
|
| +void InvalidationClientCore::ReadCallback(
|
| + pair<Status, string> read_result) {
|
| + string serialized_state;
|
| + if (read_result.first.IsSuccess()) {
|
| + serialized_state = read_result.second;
|
| + } else {
|
| + statistics_->RecordError(
|
| + Statistics::ClientErrorType_PERSISTENT_READ_FAILURE);
|
| + TLOG(logger_, WARNING, "Could not read state blob: %s",
|
| + read_result.first.message().c_str());
|
| + }
|
| + // Call start now.
|
| + internal_scheduler_->Schedule(
|
| + Scheduler::NoDelay(),
|
| + NewPermanentCallback(
|
| + this, &InvalidationClientCore::StartInternal, serialized_state));
|
| +}
|
| +
|
| +ExponentialBackoffDelayGenerator*
|
| +InvalidationClientCore::CreateExpBackOffGenerator(
|
| + const TimeDelta& initial_delay) {
|
| + return new ExponentialBackoffDelayGenerator(random_.get(), initial_delay,
|
| + config_.max_exponential_backoff_factor());
|
| +}
|
| +
|
| +InvalidationListener::RegistrationState
|
| +InvalidationClientCore::ConvertOpTypeToRegState(RegistrationP::OpType
|
| + reg_op_type) {
|
| + InvalidationListener::RegistrationState reg_state =
|
| + reg_op_type == RegistrationP_OpType_REGISTER ?
|
| + InvalidationListener::REGISTERED :
|
| + InvalidationListener::UNREGISTERED;
|
| + return reg_state;
|
| +}
|
| +
|
| +void InvalidationClientCore::MessageReceiver(string message) {
|
| + internal_scheduler_->Schedule(Scheduler::NoDelay(), NewPermanentCallback(
|
| + this,
|
| + &InvalidationClientCore::HandleIncomingMessage, message));
|
| +}
|
| +
|
| +void InvalidationClientCore::NetworkStatusReceiver(bool status) {
|
| + internal_scheduler_->Schedule(Scheduler::NoDelay(), NewPermanentCallback(
|
| + this, &InvalidationClientCore::HandleNetworkStatusChange, status));
|
| +}
|
| +
|
| +
|
| +void InvalidationClientCore::ChangeNetworkTimeoutDelayForTest(
|
| + const TimeDelta& delay) {
|
| + config_.set_network_timeout_delay_ms(delay.InMilliseconds());
|
| + CreateSchedulingTasks();
|
| +}
|
| +
|
| +void InvalidationClientCore::ChangeHeartbeatDelayForTest(
|
| + const TimeDelta& delay) {
|
| + config_.set_heartbeat_interval_ms(delay.InMilliseconds());
|
| + CreateSchedulingTasks();
|
| +}
|
| +
|
| +} // namespace invalidation
|
|
|