| Index: third_party/cacheinvalidation/src/google/cacheinvalidation/impl/protocol-handler.cc
|
| diff --git a/third_party/cacheinvalidation/src/google/cacheinvalidation/impl/protocol-handler.cc b/third_party/cacheinvalidation/src/google/cacheinvalidation/impl/protocol-handler.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..2f233b50973e41804b53a57bc2d0cb677c6bb7a7
|
| --- /dev/null
|
| +++ b/third_party/cacheinvalidation/src/google/cacheinvalidation/impl/protocol-handler.cc
|
| @@ -0,0 +1,442 @@
|
| +// Copyright 2012 Google Inc.
|
| +//
|
| +// Licensed under the Apache License, Version 2.0 (the "License");
|
| +// you may not use this file except in compliance with the License.
|
| +// You may obtain a copy of the License at
|
| +//
|
| +// http://www.apache.org/licenses/LICENSE-2.0
|
| +//
|
| +// Unless required by applicable law or agreed to in writing, software
|
| +// distributed under the License is distributed on an "AS IS" BASIS,
|
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +// See the License for the specific language governing permissions and
|
| +// limitations under the License.
|
| +
|
| +// Client for interacting with low-level protocol messages.
|
| +
|
| +#include "google/cacheinvalidation/impl/protocol-handler.h"
|
| +
|
| +#include "google/cacheinvalidation/deps/string_util.h"
|
| +#include "google/cacheinvalidation/impl/constants.h"
|
| +#include "google/cacheinvalidation/impl/invalidation-client-core.h"
|
| +#include "google/cacheinvalidation/impl/log-macro.h"
|
| +#include "google/cacheinvalidation/impl/proto-helpers.h"
|
| +#include "google/cacheinvalidation/impl/recurring-task.h"
|
| +
|
| +namespace invalidation {
|
| +
|
| +using ::ipc::invalidation::ConfigChangeMessage;
|
| +using ::ipc::invalidation::InfoMessage;
|
| +using ::ipc::invalidation::InitializeMessage;
|
| +using ::ipc::invalidation::InitializeMessage_DigestSerializationType_BYTE_BASED;
|
| +using ::ipc::invalidation::InvalidationMessage;
|
| +using ::ipc::invalidation::PropertyRecord;
|
| +using ::ipc::invalidation::RegistrationMessage;
|
| +using ::ipc::invalidation::RegistrationSyncMessage;
|
| +using ::ipc::invalidation::ServerHeader;
|
| +using ::ipc::invalidation::ServerToClientMessage;
|
| +using ::ipc::invalidation::TokenControlMessage;
|
| +
|
| +string ServerMessageHeader::ToString() const {
|
| + return StringPrintf(
|
| + "Token: %s, Summary: %s", ProtoHelpers::ToString(*token_).c_str(),
|
| + ProtoHelpers::ToString(*registration_summary_).c_str());
|
| +}
|
| +
|
| +void ParsedMessage::InitFrom(const ServerToClientMessage& raw_message) {
|
| + base_message = raw_message; // Does a deep copy.
|
| +
|
| + // For each field, assign it to the corresponding protobuf field if
|
| + // present, else NULL.
|
| + header.InitFrom(&base_message.header().client_token(),
|
| + base_message.header().has_registration_summary() ?
|
| + &base_message.header().registration_summary() : NULL);
|
| +
|
| + token_control_message = base_message.has_token_control_message() ?
|
| + &base_message.token_control_message() : NULL;
|
| +
|
| + invalidation_message = base_message.has_invalidation_message() ?
|
| + &base_message.invalidation_message() : NULL;
|
| +
|
| + registration_status_message =
|
| + base_message.has_registration_status_message() ?
|
| + &base_message.registration_status_message() : NULL;
|
| +
|
| + registration_sync_request_message =
|
| + base_message.has_registration_sync_request_message() ?
|
| + &base_message.registration_sync_request_message() : NULL;
|
| +
|
| + config_change_message = base_message.has_config_change_message() ?
|
| + &base_message.config_change_message() : NULL;
|
| +
|
| + info_request_message = base_message.has_info_request_message() ?
|
| + &base_message.info_request_message() : NULL;
|
| +
|
| + error_message = base_message.has_error_message() ?
|
| + &base_message.error_message() : NULL;
|
| +}
|
| +
|
| +ProtocolHandler::ProtocolHandler(
|
| + const ProtocolHandlerConfigP& config, SystemResources* resources,
|
| + Smearer* smearer, Statistics* statistics, int client_type,
|
| + const string& application_name, ProtocolListener* listener,
|
| + TiclMessageValidator* msg_validator)
|
| + : logger_(resources->logger()),
|
| + internal_scheduler_(resources->internal_scheduler()),
|
| + network_(resources->network()),
|
| + throttle_(config.rate_limit(), internal_scheduler_,
|
| + NewPermanentCallback(this, &ProtocolHandler::SendMessageToServer)),
|
| + listener_(listener),
|
| + msg_validator_(msg_validator),
|
| + message_id_(1),
|
| + last_known_server_time_ms_(0),
|
| + next_message_send_time_ms_(0),
|
| + statistics_(statistics),
|
| + batcher_(resources->logger(), statistics),
|
| + client_type_(client_type) {
|
| + // Initialize client version.
|
| + ProtoHelpers::InitClientVersion(resources->platform(), application_name,
|
| + &client_version_);
|
| +}
|
| +
|
| +void ProtocolHandler::InitConfig(ProtocolHandlerConfigP* config) {
|
| + // Add rate limits.
|
| +
|
| + // Allow at most 3 messages every 5 seconds.
|
| + int window_ms = 5 * 1000;
|
| + int num_messages_per_window = 3;
|
| +
|
| + ProtoHelpers::InitRateLimitP(window_ms, num_messages_per_window,
|
| + config->add_rate_limit());
|
| +}
|
| +
|
| +void ProtocolHandler::InitConfigForTest(ProtocolHandlerConfigP* config) {
|
| + // No rate limits.
|
| + int small_batch_delay_for_test = 200;
|
| + config->set_batching_delay_ms(small_batch_delay_for_test);
|
| +
|
| + // At most one message per second.
|
| + ProtoHelpers::InitRateLimitP(1000, 1, config->add_rate_limit());
|
| + // At most six messages per minute.
|
| + ProtoHelpers::InitRateLimitP(60 * 1000, 6, config->add_rate_limit());
|
| +}
|
| +
|
| +bool ProtocolHandler::HandleIncomingMessage(const string& incoming_message,
|
| + ParsedMessage* parsed_message) {
|
| + ServerToClientMessage message;
|
| + message.ParseFromString(incoming_message);
|
| + if (!message.IsInitialized()) {
|
| + TLOG(logger_, WARNING, "Incoming message is unparseable: %s",
|
| + ProtoHelpers::ToString(incoming_message).c_str());
|
| + return false;
|
| + }
|
| +
|
| + // Validate the message. If this passes, we can blindly assume valid messages
|
| + // from here on.
|
| + TLOG(logger_, FINE, "Incoming message: %s",
|
| + ProtoHelpers::ToString(message).c_str());
|
| +
|
| + if (!msg_validator_->IsValid(message)) {
|
| + statistics_->RecordError(
|
| + Statistics::ClientErrorType_INCOMING_MESSAGE_FAILURE);
|
| + TLOG(logger_, SEVERE, "Received invalid message: %s",
|
| + ProtoHelpers::ToString(message).c_str());
|
| + return false;
|
| + }
|
| +
|
| + // Check the version of the message.
|
| + const ServerHeader& message_header = message.header();
|
| + if (message_header.protocol_version().version().major_version() !=
|
| + Constants::kProtocolMajorVersion) {
|
| + statistics_->RecordError(
|
| + Statistics::ClientErrorType_PROTOCOL_VERSION_FAILURE);
|
| + TLOG(logger_, SEVERE, "Dropping message with incompatible version: %s",
|
| + ProtoHelpers::ToString(message).c_str());
|
| + return false;
|
| + }
|
| +
|
| + // Check if it is a ConfigChangeMessage which indicates that messages should
|
| + // no longer be sent for a certain duration. Perform this check before the
|
| + // token is even checked.
|
| + if (message.has_config_change_message()) {
|
| + const ConfigChangeMessage& config_change_msg =
|
| + message.config_change_message();
|
| + statistics_->RecordReceivedMessage(
|
| + Statistics::ReceivedMessageType_CONFIG_CHANGE);
|
| + if (config_change_msg.has_next_message_delay_ms()) {
|
| + // Validator has ensured that it is positive.
|
| + next_message_send_time_ms_ = GetCurrentTimeMs() +
|
| + config_change_msg.next_message_delay_ms();
|
| + }
|
| + return false; // Ignore all other messages in the envelope.
|
| + }
|
| +
|
| + if (message_header.server_time_ms() > last_known_server_time_ms_) {
|
| + last_known_server_time_ms_ = message_header.server_time_ms();
|
| + }
|
| + parsed_message->InitFrom(message);
|
| + return true;
|
| +}
|
| +
|
| +bool ProtocolHandler::CheckServerToken(const string& server_token) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + const string& client_token = listener_->GetClientToken();
|
| +
|
| + // If we do not have a client token yet, there is nothing to compare. The
|
| + // message must have an initialize message and the upper layer will do the
|
| + // appropriate checks. Hence, we return true if client_token is empty.
|
| + if (client_token.empty()) {
|
| + // No token. Return true so that we'll attempt to deliver a token control
|
| + // message (if any) to the listener in handleIncomingMessage.
|
| + return true;
|
| + }
|
| +
|
| + if (client_token != server_token) {
|
| + // Bad token - reject whole message. However, our channel can send us
|
| + // messages intended for other clients belonging to the same user, so don't
|
| + // log too loudly.
|
| + TLOG(logger_, INFO, "Incoming message has bad token: %s, %s",
|
| + ProtoHelpers::ToString(client_token).c_str(),
|
| + ProtoHelpers::ToString(server_token).c_str());
|
| + statistics_->RecordError(Statistics::ClientErrorType_TOKEN_MISMATCH);
|
| + return false;
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +void ProtocolHandler::SendInitializeMessage(
|
| + const ApplicationClientIdP& application_client_id,
|
| + const string& nonce,
|
| + BatchingTask* batching_task,
|
| + const string& debug_string) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + if (application_client_id.client_type() != client_type_) {
|
| + // This condition is not fatal, but it probably represents a bug somewhere
|
| + // if it occurs.
|
| + TLOG(logger_, WARNING, "Client type in application id does not match "
|
| + "constructor-provided type: %s vs %s",
|
| + ProtoHelpers::ToString(application_client_id).c_str(), client_type_);
|
| + }
|
| +
|
| + // Simply store the message in pending_initialize_message_ and send it
|
| + // when the batching task runs.
|
| + InitializeMessage* message = new InitializeMessage();
|
| + ProtoHelpers::InitInitializeMessage(application_client_id, nonce, message);
|
| + TLOG(logger_, INFO, "Batching initialize message for client: %s, %s",
|
| + debug_string.c_str(),
|
| + ProtoHelpers::ToString(*message).c_str());
|
| + batcher_.SetInitializeMessage(message);
|
| + batching_task->EnsureScheduled(debug_string);
|
| +}
|
| +
|
| +void ProtocolHandler::SendInfoMessage(
|
| + const vector<pair<string, int> >& performance_counters,
|
| + ClientConfigP* client_config,
|
| + bool request_server_registration_summary,
|
| + BatchingTask* batching_task) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + // Simply store the message in pending_info_message_ and send it
|
| + // when the batching task runs.
|
| + InfoMessage* message = new InfoMessage();
|
| + message->mutable_client_version()->CopyFrom(client_version_);
|
| +
|
| + // Add configuration parameters.
|
| + if (client_config != NULL) {
|
| + message->mutable_client_config()->CopyFrom(*client_config);
|
| + }
|
| +
|
| + // Add performance counters.
|
| + for (size_t i = 0; i < performance_counters.size(); ++i) {
|
| + PropertyRecord* counter = message->add_performance_counter();
|
| + counter->set_name(performance_counters[i].first);
|
| + counter->set_value(performance_counters[i].second);
|
| + }
|
| +
|
| + // Indicate whether we want the server's registration summary sent back.
|
| + message->set_server_registration_summary_requested(
|
| + request_server_registration_summary);
|
| +
|
| + TLOG(logger_, INFO, "Batching info message for client: %s",
|
| + ProtoHelpers::ToString(*message).c_str());
|
| + batcher_.SetInfoMessage(message);
|
| + batching_task->EnsureScheduled("Send-info");
|
| +}
|
| +
|
| +void ProtocolHandler::SendRegistrations(
|
| + const vector<ObjectIdP>& object_ids,
|
| + RegistrationP::OpType reg_op_type,
|
| + BatchingTask* batching_task) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + for (size_t i = 0; i < object_ids.size(); ++i) {
|
| + batcher_.AddRegistration(object_ids[i], reg_op_type);
|
| + }
|
| + batching_task->EnsureScheduled("Send-registrations");
|
| +}
|
| +
|
| +void ProtocolHandler::SendInvalidationAck(const InvalidationP& invalidation,
|
| + BatchingTask* batching_task) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + // We could summarize acks if there are suppressing invalidations - we don't
|
| + // since it is unlikely to be too beneficial here.
|
| + batcher_.AddAck(invalidation);
|
| + batching_task->EnsureScheduled("Send-ack");
|
| +}
|
| +
|
| +void ProtocolHandler::SendRegistrationSyncSubtree(
|
| + const RegistrationSubtree& reg_subtree,
|
| + BatchingTask* batching_task) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + TLOG(logger_, INFO, "Adding subtree: %s",
|
| + ProtoHelpers::ToString(reg_subtree).c_str());
|
| + batcher_.AddRegSubtree(reg_subtree);
|
| + batching_task->EnsureScheduled("Send-reg-sync");
|
| +}
|
| +
|
| +void ProtocolHandler::SendMessageToServer() {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| +
|
| + if (next_message_send_time_ms_ > GetCurrentTimeMs()) {
|
| + TLOG(logger_, WARNING, "In quiet period: not sending message to server: "
|
| + "%s > %s",
|
| + SimpleItoa(next_message_send_time_ms_).c_str(),
|
| + SimpleItoa(GetCurrentTimeMs()).c_str());
|
| + return;
|
| + }
|
| +
|
| + const bool has_client_token(!listener_->GetClientToken().empty());
|
| + ClientToServerMessage builder;
|
| + if (!batcher_.ToBuilder(&builder, has_client_token)) {
|
| + TLOG(logger_, WARNING, "Unable to build message");
|
| + return;
|
| + }
|
| + ClientHeader* outgoing_header = builder.mutable_header();
|
| + InitClientHeader(outgoing_header);
|
| +
|
| + // Validate the message and send it.
|
| + ++message_id_;
|
| + if (!msg_validator_->IsValid(builder)) {
|
| + TLOG(logger_, SEVERE, "Tried to send invalid message: %s",
|
| + ProtoHelpers::ToString(builder).c_str());
|
| + statistics_->RecordError(
|
| + Statistics::ClientErrorType_OUTGOING_MESSAGE_FAILURE);
|
| + return;
|
| + }
|
| +
|
| + TLOG(logger_, FINE, "Sending message to server: %s",
|
| + ProtoHelpers::ToString(builder).c_str());
|
| + statistics_->RecordSentMessage(Statistics::SentMessageType_TOTAL);
|
| + string serialized;
|
| + builder.SerializeToString(&serialized);
|
| + network_->SendMessage(serialized);
|
| +
|
| + // Record that the message was sent. We do this inline to match what the
|
| + // Java Ticl, which is constrained by Android requirements, does.
|
| + listener_->HandleMessageSent();
|
| +}
|
| +
|
| +void ProtocolHandler::InitClientHeader(ClientHeader* builder) {
|
| + CHECK(internal_scheduler_->IsRunningOnThread()) << "Not on internal thread";
|
| + ProtoHelpers::InitProtocolVersion(builder->mutable_protocol_version());
|
| + builder->set_client_time_ms(GetCurrentTimeMs());
|
| + builder->set_message_id(StringPrintf("%d", message_id_));
|
| + builder->set_max_known_server_time_ms(last_known_server_time_ms_);
|
| + builder->set_client_type(client_type_);
|
| + listener_->GetRegistrationSummary(builder->mutable_registration_summary());
|
| + const string& client_token = listener_->GetClientToken();
|
| + if (!client_token.empty()) {
|
| + TLOG(logger_, FINE, "Sending token on client->server message: %s",
|
| + ProtoHelpers::ToString(client_token).c_str());
|
| + builder->set_client_token(client_token);
|
| + }
|
| +}
|
| +
|
| +bool Batcher::ToBuilder(ClientToServerMessage* builder, bool has_client_token) {
|
| + // Check if an initialize message needs to be sent.
|
| + if (pending_initialize_message_.get() != NULL) {
|
| + statistics_->RecordSentMessage(Statistics::SentMessageType_INITIALIZE);
|
| + builder->mutable_initialize_message()->CopyFrom(
|
| + *pending_initialize_message_);
|
| + pending_initialize_message_.reset();
|
| + }
|
| +
|
| + // Note: Even if an initialize message is being sent, we can send additional
|
| + // messages such as registration messages, etc to the server. But if there is
|
| + // no token and an initialize message is not being sent, we cannot send any
|
| + // other message.
|
| +
|
| + if (!has_client_token && !builder->has_initialize_message()) {
|
| + // Cannot send any message.
|
| + TLOG(logger_, WARNING,
|
| + "Cannot send message since no token and no initialize msg: %s",
|
| + ProtoHelpers::ToString(*builder).c_str());
|
| + statistics_->RecordError(Statistics::ClientErrorType_TOKEN_MISSING_FAILURE);
|
| + return false;
|
| + }
|
| +
|
| + // Check for pending batched operations and add to message builder if needed.
|
| +
|
| + // Add reg, acks, reg subtrees - clear them after adding.
|
| + if (!pending_acked_invalidations_.empty()) {
|
| + InitAckMessage(builder->mutable_invalidation_ack_message());
|
| + statistics_->RecordSentMessage(
|
| + Statistics::SentMessageType_INVALIDATION_ACK);
|
| + }
|
| +
|
| + // Check regs.
|
| + if (!pending_registrations_.empty()) {
|
| + InitRegistrationMessage(builder->mutable_registration_message());
|
| + statistics_->RecordSentMessage(Statistics::SentMessageType_REGISTRATION);
|
| + }
|
| +
|
| + // Check reg substrees.
|
| + if (!pending_reg_subtrees_.empty()) {
|
| + RegistrationSyncMessage* sync_message =
|
| + builder->mutable_registration_sync_message();
|
| + set<RegistrationSubtree, ProtoCompareLess>::const_iterator iter;
|
| + for (iter = pending_reg_subtrees_.begin();
|
| + iter != pending_reg_subtrees_.end(); ++iter) {
|
| + sync_message->add_subtree()->CopyFrom(*iter);
|
| + }
|
| + pending_reg_subtrees_.clear();
|
| + statistics_->RecordSentMessage(
|
| + Statistics::SentMessageType_REGISTRATION_SYNC);
|
| + }
|
| +
|
| + // Check info message.
|
| + if (pending_info_message_.get() != NULL) {
|
| + statistics_->RecordSentMessage(Statistics::SentMessageType_INFO);
|
| + builder->mutable_info_message()->CopyFrom(*pending_info_message_);
|
| + pending_info_message_.reset();
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +void Batcher::InitRegistrationMessage(
|
| + RegistrationMessage* reg_message) {
|
| + CHECK(!pending_registrations_.empty());
|
| +
|
| + // Run through the pending_registrations map.
|
| + map<ObjectIdP, RegistrationP::OpType, ProtoCompareLess>::iterator iter;
|
| + for (iter = pending_registrations_.begin();
|
| + iter != pending_registrations_.end(); ++iter) {
|
| + ProtoHelpers::InitRegistrationP(iter->first, iter->second,
|
| + reg_message->add_registration());
|
| + }
|
| + pending_registrations_.clear();
|
| +}
|
| +
|
| +void Batcher::InitAckMessage(InvalidationMessage* ack_message) {
|
| + CHECK(!pending_acked_invalidations_.empty());
|
| +
|
| + // Run through pending_acked_invalidations_ set.
|
| + set<InvalidationP, ProtoCompareLess>::iterator iter;
|
| + for (iter = pending_acked_invalidations_.begin();
|
| + iter != pending_acked_invalidations_.end(); iter++) {
|
| + ack_message->add_invalidation()->CopyFrom(*iter);
|
| + }
|
| + pending_acked_invalidations_.clear();
|
| +}
|
| +
|
| +} // namespace invalidation
|
|
|