| Index: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java
|
| diff --git a/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..131de39df883e1a4f1b15539876d62c12c90d18d
|
| --- /dev/null
|
| +++ b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java
|
| @@ -0,0 +1,661 @@
|
| +/*
|
| + * Copyright 2011 Google Inc.
|
| + *
|
| + * Licensed under the Apache License, Version 2.0 (the "License");
|
| + * you may not use this file except in compliance with the License.
|
| + * You may obtain a copy of the License at
|
| + *
|
| + * http://www.apache.org/licenses/LICENSE-2.0
|
| + *
|
| + * Unless required by applicable law or agreed to in writing, software
|
| + * distributed under the License is distributed on an "AS IS" BASIS,
|
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| + * See the License for the specific language governing permissions and
|
| + * limitations under the License.
|
| + */
|
| +
|
| +package com.google.ipc.invalidation.ticl;
|
| +
|
| +import com.google.ipc.invalidation.external.client.SystemResources;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.Logger;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.NetworkChannel;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.Scheduler;
|
| +import com.google.ipc.invalidation.external.client.types.SimplePair;
|
| +import com.google.ipc.invalidation.ticl.InvalidationClientCore.BatchingTask;
|
| +import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
|
| +import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
|
| +import com.google.ipc.invalidation.ticl.Statistics.SentMessageType;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientConstants;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientIdP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientHeader;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientToServerMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientVersion;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ConfigChangeMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage.DigestSerializationType;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.PropertyRecord;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConfigP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RateLimitP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpType;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatusMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncRequestMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerHeader;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerToClientMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.TokenControlMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.CommonProtos;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.BatcherState;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
|
| +import com.google.ipc.invalidation.util.Bytes;
|
| +import com.google.ipc.invalidation.util.InternalBase;
|
| +import com.google.ipc.invalidation.util.Marshallable;
|
| +import com.google.ipc.invalidation.util.Preconditions;
|
| +import com.google.ipc.invalidation.util.ProtoWrapper;
|
| +import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
|
| +import com.google.ipc.invalidation.util.Smearer;
|
| +import com.google.ipc.invalidation.util.TextBuilder;
|
| +
|
| +import java.util.ArrayList;
|
| +import java.util.Collection;
|
| +import java.util.HashMap;
|
| +import java.util.HashSet;
|
| +import java.util.List;
|
| +import java.util.Map;
|
| +import java.util.Set;
|
| +
|
| +
|
| +/**
|
| + * A layer for interacting with low-level protocol messages. Parses messages from the server and
|
| + * calls appropriate functions on the {@code ProtocolListener} to handle various types of message
|
| + * content. Also buffers message data from the client and constructs and sends messages to the
|
| + * server.
|
| + * <p>
|
| + * This class implements {@link Marshallable}, so its state can be written to a protocol buffer,
|
| + * and instances can be restored from such protocol buffers. Additionally, the nested class
|
| + * {@link Batcher} also implements {@code Marshallable} for the same reason.
|
| + * <p>
|
| + * Note that while we talk about "marshalling," in this context we mean marshalling to protocol
|
| + * buffers, not raw bytes.
|
| + *
|
| + */
|
| +class ProtocolHandler implements Marshallable<ProtocolHandlerState> {
|
| + /** Class that batches messages to the server. */
|
| + private static class Batcher implements Marshallable<BatcherState> {
|
| + /** Statistics to be updated when messages are created. */
|
| + private final Statistics statistics;
|
| +
|
| + /** Resources used for logging and thread assertions. */
|
| + private final SystemResources resources;
|
| +
|
| + /** Set of pending registrations stored as a map for overriding later operations. */
|
| + private final Map<ObjectIdP, Integer> pendingRegistrations = new HashMap<ObjectIdP, Integer>();
|
| +
|
| + /** Set of pending invalidation acks. */
|
| + private final Set<InvalidationP> pendingAckedInvalidations = new HashSet<InvalidationP>();
|
| +
|
| + /** Set of pending registration sub trees for registration sync. */
|
| + private final Set<RegistrationSubtree> pendingRegSubtrees = new HashSet<RegistrationSubtree>();
|
| +
|
| + /** Pending initialization message to send to the server, if any. */
|
| + private InitializeMessage pendingInitializeMessage = null;
|
| +
|
| + /** Pending info message to send to the server, if any. */
|
| + private InfoMessage pendingInfoMessage = null;
|
| +
|
| + /** Creates a batcher. */
|
| + Batcher(SystemResources resources, Statistics statistics) {
|
| + this.resources = resources;
|
| + this.statistics = statistics;
|
| + }
|
| +
|
| + /** Creates a batcher from {@code marshalledState}. */
|
| + Batcher(SystemResources resources, Statistics statistics, BatcherState marshalledState) {
|
| + this(resources, statistics);
|
| + for (ObjectIdP registration : marshalledState.getRegistration()) {
|
| + pendingRegistrations.put(registration, RegistrationP.OpType.REGISTER);
|
| + }
|
| + for (ObjectIdP unregistration : marshalledState.getUnregistration()) {
|
| + pendingRegistrations.put(unregistration, RegistrationP.OpType.UNREGISTER);
|
| + }
|
| + for (InvalidationP ack : marshalledState.getAcknowledgement()) {
|
| + pendingAckedInvalidations.add(ack);
|
| + }
|
| + for (RegistrationSubtree subtree : marshalledState.getRegistrationSubtree()) {
|
| + pendingRegSubtrees.add(subtree);
|
| + }
|
| + pendingInitializeMessage = marshalledState.getNullableInitializeMessage();
|
| + if (marshalledState.hasInfoMessage()) {
|
| + pendingInfoMessage = marshalledState.getInfoMessage();
|
| + }
|
| + }
|
| +
|
| + /** Sets the initialize message to be sent. */
|
| + void setInitializeMessage(InitializeMessage msg) {
|
| + pendingInitializeMessage = msg;
|
| + }
|
| +
|
| + /** Sets the info message to be sent. */
|
| + void setInfoMessage(InfoMessage msg) {
|
| + pendingInfoMessage = msg;
|
| + }
|
| +
|
| + /** Adds a registration on {@code oid} of {@code opType} to the registrations to be sent. */
|
| + void addRegistration(ObjectIdP oid, Integer opType) {
|
| + pendingRegistrations.put(oid, opType);
|
| + }
|
| +
|
| + /** Adds {@code ack} to the set of acknowledgements to be sent. */
|
| + void addAck(InvalidationP ack) {
|
| + pendingAckedInvalidations.add(ack);
|
| + }
|
| +
|
| + /** Adds {@code subtree} to the set of registration subtrees to be sent. */
|
| + void addRegSubtree(RegistrationSubtree subtree) {
|
| + pendingRegSubtrees.add(subtree);
|
| + }
|
| +
|
| + /**
|
| + * Returns a builder for a {@link ClientToServerMessage} to be sent to the server. Crucially,
|
| + * the builder does <b>NOT</b> include the message header.
|
| + * @param hasClientToken whether the client currently holds a token
|
| + */
|
| + ClientToServerMessage toMessage(final ClientHeader header, boolean hasClientToken) {
|
| + final InitializeMessage initializeMessage;
|
| + final RegistrationMessage registrationMessage;
|
| + final RegistrationSyncMessage registrationSyncMessage;
|
| + final InvalidationMessage invalidationAckMessage;
|
| + final InfoMessage infoMessage;
|
| +
|
| + if (pendingInitializeMessage != null) {
|
| + statistics.recordSentMessage(SentMessageType.INITIALIZE);
|
| + initializeMessage = pendingInitializeMessage;
|
| + pendingInitializeMessage = null;
|
| + } else {
|
| + initializeMessage = null;
|
| + }
|
| +
|
| + // Note: Even if an initialize message is being sent, we can send additional
|
| + // messages such as regisration messages, etc to the server. But if there is no token
|
| + // and an initialize message is not being sent, we cannot send any other message.
|
| +
|
| + if (!hasClientToken && (initializeMessage == null)) {
|
| + // Cannot send any message
|
| + resources.getLogger().warning(
|
| + "Cannot send message since no token and no initialize msg");
|
| + statistics.recordError(ClientErrorType.TOKEN_MISSING_FAILURE);
|
| + return null;
|
| + }
|
| +
|
| + // Check for pending batched operations and add to message builder if needed.
|
| +
|
| + // Add reg, acks, reg subtrees - clear them after adding.
|
| + if (!pendingAckedInvalidations.isEmpty()) {
|
| + invalidationAckMessage = createInvalidationAckMessage();
|
| + statistics.recordSentMessage(SentMessageType.INVALIDATION_ACK);
|
| + } else {
|
| + invalidationAckMessage = null;
|
| + }
|
| +
|
| + // Check regs.
|
| + if (!pendingRegistrations.isEmpty()) {
|
| + registrationMessage = createRegistrationMessage();
|
| + statistics.recordSentMessage(SentMessageType.REGISTRATION);
|
| + } else {
|
| + registrationMessage = null;
|
| + }
|
| +
|
| + // Check reg substrees.
|
| + if (!pendingRegSubtrees.isEmpty()) {
|
| + // If there are multiple pending reg subtrees, only one is sent.
|
| + ArrayList<RegistrationSubtree> regSubtrees = new ArrayList<RegistrationSubtree>(1);
|
| + regSubtrees.add(pendingRegSubtrees.iterator().next());
|
| + registrationSyncMessage = RegistrationSyncMessage.create(regSubtrees);
|
| + pendingRegSubtrees.clear();
|
| + statistics.recordSentMessage(SentMessageType.REGISTRATION_SYNC);
|
| + } else {
|
| + registrationSyncMessage = null;
|
| + }
|
| +
|
| + // Check if an info message has to be sent.
|
| + if (pendingInfoMessage != null) {
|
| + statistics.recordSentMessage(SentMessageType.INFO);
|
| + infoMessage = pendingInfoMessage;
|
| + pendingInfoMessage = null;
|
| + } else {
|
| + infoMessage = null;
|
| + }
|
| +
|
| + return ClientToServerMessage.create(header, initializeMessage, registrationMessage,
|
| + registrationSyncMessage, invalidationAckMessage, infoMessage);
|
| + }
|
| +
|
| + /**
|
| + * Creates a registration message based on registrations from {@code pendingRegistrations}
|
| + * and returns it.
|
| + * <p>
|
| + * REQUIRES: pendingRegistrations.size() > 0
|
| + */
|
| + private RegistrationMessage createRegistrationMessage() {
|
| + Preconditions.checkState(!pendingRegistrations.isEmpty());
|
| +
|
| + // Run through the pendingRegistrations map.
|
| + List<RegistrationP> pendingRegistrations =
|
| + new ArrayList<RegistrationP>(this.pendingRegistrations.size());
|
| + for (Map.Entry<ObjectIdP, Integer> entry : this.pendingRegistrations.entrySet()) {
|
| + pendingRegistrations.add(RegistrationP.create(entry.getKey(), entry.getValue()));
|
| + }
|
| + this.pendingRegistrations.clear();
|
| + return RegistrationMessage.create(pendingRegistrations);
|
| + }
|
| +
|
| + /**
|
| + * Creates an invalidation ack message based on acks from {@code pendingAckedInvalidations} and
|
| + * returns it.
|
| + * <p>
|
| + * REQUIRES: pendingAckedInvalidations.size() > 0
|
| + */
|
| + private InvalidationMessage createInvalidationAckMessage() {
|
| + Preconditions.checkState(!pendingAckedInvalidations.isEmpty());
|
| + InvalidationMessage ackMessage =
|
| + InvalidationMessage.create(new ArrayList<InvalidationP>(pendingAckedInvalidations));
|
| + pendingAckedInvalidations.clear();
|
| + return ackMessage;
|
| + }
|
| +
|
| + @Override
|
| + public BatcherState marshal() {
|
| + // Marshall (un)registrations.
|
| + ArrayList<ObjectIdP> registrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
|
| + ArrayList<ObjectIdP> unregistrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
|
| + for (Map.Entry<ObjectIdP, Integer> entry : pendingRegistrations.entrySet()) {
|
| + Integer opType = entry.getValue();
|
| + ObjectIdP oid = entry.getKey();
|
| + new ArrayList<ObjectIdP>(pendingRegistrations.size());
|
| + switch (opType) {
|
| + case OpType.REGISTER:
|
| + registrations.add(oid);
|
| + break;
|
| + case OpType.UNREGISTER:
|
| + unregistrations.add(oid);
|
| + break;
|
| + default:
|
| + throw new IllegalArgumentException(opType.toString());
|
| + }
|
| + }
|
| + return BatcherState.create(registrations, unregistrations, pendingAckedInvalidations,
|
| + pendingRegSubtrees, pendingInitializeMessage, pendingInfoMessage);
|
| + }
|
| + }
|
| +
|
| + /** Representation of a message header for use in a server message. */
|
| + static class ServerMessageHeader extends InternalBase {
|
| + /**
|
| + * Constructs an instance.
|
| + *
|
| + * @param token server-sent token
|
| + * @param registrationSummary summary over server registration state
|
| + */
|
| + ServerMessageHeader(Bytes token, RegistrationSummary registrationSummary) {
|
| + this.token = token;
|
| + this.registrationSummary = registrationSummary;
|
| + }
|
| +
|
| + /** Server-sent token. */
|
| + Bytes token;
|
| +
|
| + /** Summary of the client's registration state at the server. */
|
| + RegistrationSummary registrationSummary;
|
| +
|
| + @Override
|
| + public void toCompactString(TextBuilder builder) {
|
| + builder.appendFormat("Token: %s, Summary: %s", token, registrationSummary);
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Representation of a message receiver for the server. Such a message is guaranteed to be
|
| + * valid, but the session token is <b>not</b> checked.
|
| + */
|
| + static class ParsedMessage {
|
| + /*
|
| + * Each of these fields corresponds directly to a field in the ServerToClientMessage protobuf.
|
| + * It is non-null iff the corresponding hasYYY method in the protobuf would return true.
|
| + */
|
| + final ServerMessageHeader header;
|
| + final TokenControlMessage tokenControlMessage;
|
| + final InvalidationMessage invalidationMessage;
|
| + final RegistrationStatusMessage registrationStatusMessage;
|
| + final RegistrationSyncRequestMessage registrationSyncRequestMessage;
|
| + final ConfigChangeMessage configChangeMessage;
|
| + final InfoRequestMessage infoRequestMessage;
|
| + final ErrorMessage errorMessage;
|
| +
|
| + /** Constructs an instance from a {@code rawMessage}. */
|
| + ParsedMessage(ServerToClientMessage rawMessage) {
|
| + // For each field, assign it to the corresponding protobuf field if present, else null.
|
| + ServerHeader messageHeader = rawMessage.getHeader();
|
| + header = new ServerMessageHeader(messageHeader.getClientToken(),
|
| + messageHeader.getNullableRegistrationSummary());
|
| + tokenControlMessage =
|
| + rawMessage.hasTokenControlMessage() ? rawMessage.getTokenControlMessage() : null;
|
| + invalidationMessage = rawMessage.getNullableInvalidationMessage();
|
| + registrationStatusMessage = rawMessage.getNullableRegistrationStatusMessage();
|
| + registrationSyncRequestMessage = rawMessage.hasRegistrationSyncRequestMessage()
|
| + ? rawMessage.getRegistrationSyncRequestMessage() : null;
|
| + configChangeMessage =
|
| + rawMessage.hasConfigChangeMessage() ? rawMessage.getConfigChangeMessage() : null;
|
| + infoRequestMessage = rawMessage.getNullableInfoRequestMessage();
|
| + errorMessage = rawMessage.getNullableErrorMessage();
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Listener for protocol events. The handler guarantees that the call will be made on the internal
|
| + * thread that the SystemResources provides.
|
| + */
|
| + interface ProtocolListener {
|
| + /** Records that a message was sent to the server at the current time. */
|
| + void handleMessageSent();
|
| +
|
| + /** Returns a summary of the current desired registrations. */
|
| + RegistrationSummary getRegistrationSummary();
|
| +
|
| + /** Returns the current server-assigned client token, if any. */
|
| + Bytes getClientToken();
|
| + }
|
| +
|
| + /** Information about the client, e.g., application name, OS, etc. */
|
| + private final ClientVersion clientVersion;
|
| +
|
| + /** A logger. */
|
| + private final Logger logger;
|
| +
|
| + /** Scheduler for the client's internal processing. */
|
| + private final Scheduler internalScheduler;
|
| +
|
| + /** Network channel for sending and receiving messages to and from the server. */
|
| + private final NetworkChannel network;
|
| +
|
| + /** The protocol listener. */
|
| + private final ProtocolListener listener;
|
| +
|
| + /** Batches messages to the server. */
|
| + private final Batcher batcher;
|
| +
|
| + /** A debug message id that is added to every message to the server. */
|
| + private int messageId = 1;
|
| +
|
| + // State specific to a client. If we want to support multiple clients, this could
|
| + // be in a map or could be eliminated (e.g., no batching).
|
| +
|
| + /** The last known time from the server. */
|
| + private long lastKnownServerTimeMs = 0;
|
| +
|
| + /**
|
| + * The next time before which a message cannot be sent to the server. If this is less than current
|
| + * time, a message can be sent at any time.
|
| + */
|
| + private long nextMessageSendTimeMs = 0;
|
| +
|
| + /** Statistics objects to track number of sent messages, etc. */
|
| + private final Statistics statistics;
|
| +
|
| + /** Client type for inclusion in headers. */
|
| + private final int clientType;
|
| +
|
| + /**
|
| + * Creates an instance.
|
| + *
|
| + * @param config configuration for the client
|
| + * @param resources resources to use
|
| + * @param smearer a smearer to randomize delays
|
| + * @param statistics track information about messages sent/received, etc
|
| + * @param applicationName name of the application using the library (for debugging/monitoring)
|
| + * @param listener callback for protocol events
|
| + */
|
| + ProtocolHandler(ProtocolHandlerConfigP config, final SystemResources resources,
|
| + Smearer smearer, Statistics statistics, int clientType, String applicationName,
|
| + ProtocolListener listener, ProtocolHandlerState marshalledState) {
|
| + this.logger = resources.getLogger();
|
| + this.statistics = statistics;
|
| + this.internalScheduler = resources.getInternalScheduler();
|
| + this.network = resources.getNetwork();
|
| + this.listener = listener;
|
| + this.clientVersion = CommonProtos.newClientVersion(resources.getPlatform(), "Java",
|
| + applicationName);
|
| + this.clientType = clientType;
|
| + if (marshalledState == null) {
|
| + // If there is no marshalled state, construct a clean batcher.
|
| + this.batcher = new Batcher(resources, statistics);
|
| + } else {
|
| + // Otherwise, restore the batcher from the marshalled state.
|
| + this.batcher = new Batcher(resources, statistics, marshalledState.getBatcherState());
|
| + this.messageId = marshalledState.getMessageId();
|
| + this.lastKnownServerTimeMs = marshalledState.getLastKnownServerTimeMs();
|
| + this.nextMessageSendTimeMs = marshalledState.getNextMessageSendTimeMs();
|
| + }
|
| + logger.info("Created protocol handler for application %s, platform %s", applicationName,
|
| + resources.getPlatform());
|
| + }
|
| +
|
| + /** Returns a default config for the protocol handler. */
|
| + static ProtocolHandlerConfigP createConfig() {
|
| + // Allow at most 3 messages every 5 seconds.
|
| + int windowMs = 5 * 1000;
|
| + int numMessagesPerWindow = 3;
|
| +
|
| + List<RateLimitP> rateLimits = new ArrayList<RateLimitP>();
|
| + rateLimits.add(RateLimitP.create(windowMs, numMessagesPerWindow));
|
| + return ProtocolHandlerConfigP.create(null, rateLimits);
|
| + }
|
| +
|
| + /** Returns a configuration object with parameters set for unit tests. */
|
| + static ProtocolHandlerConfigP createConfigForTest() {
|
| + // No rate limits
|
| + int smallBatchDelayForTest = 200;
|
| + return ProtocolHandlerConfigP.create(smallBatchDelayForTest, new ArrayList<RateLimitP>(0));
|
| + }
|
| +
|
| + /**
|
| + * Returns the next time a message is allowed to be sent to the server. Typically, this will be
|
| + * in the past, meaning that the client is free to send a message at any time.
|
| + */
|
| + public long getNextMessageSendTimeMsForTest() {
|
| + return nextMessageSendTimeMs;
|
| + }
|
| +
|
| + /**
|
| + * Handles a message from the server. If the message can be processed (i.e., is valid, is
|
| + * of the right version, and is not a silence message), returns a {@link ParsedMessage}
|
| + * representing it. Otherwise, returns {@code null}.
|
| + * <p>
|
| + * This class intercepts and processes silence messages. In this case, it will discard any other
|
| + * data in the message.
|
| + * <p>
|
| + * Note that this method does <b>not</b> check the session token of any message.
|
| + */
|
| + ParsedMessage handleIncomingMessage(byte[] incomingMessage) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + ServerToClientMessage message;
|
| + try {
|
| + message = ServerToClientMessage.parseFrom(incomingMessage);
|
| + } catch (ValidationException exception) {
|
| + statistics.recordError(ClientErrorType.INCOMING_MESSAGE_FAILURE);
|
| + logger.warning("Incoming message is invalid: %s", Bytes.toLazyCompactString(incomingMessage));
|
| + return null;
|
| + }
|
| +
|
| + // Check the version of the message.
|
| + if (message.getHeader().getProtocolVersion().getVersion().getMajorVersion() !=
|
| + ClientConstants.PROTOCOL_MAJOR_VERSION) {
|
| + statistics.recordError(ClientErrorType.PROTOCOL_VERSION_FAILURE);
|
| + logger.severe("Dropping message with incompatible version: %s", message);
|
| + return null;
|
| + }
|
| +
|
| + // Check if it is a ConfigChangeMessage which indicates that messages should no longer be
|
| + // sent for a certain duration. Perform this check before the token is even checked.
|
| + if (message.hasConfigChangeMessage()) {
|
| + ConfigChangeMessage configChangeMsg = message.getConfigChangeMessage();
|
| + statistics.recordReceivedMessage(ReceivedMessageType.CONFIG_CHANGE);
|
| + if (configChangeMsg.hasNextMessageDelayMs()) { // Validator has ensured that it is positive.
|
| + nextMessageSendTimeMs =
|
| + internalScheduler.getCurrentTimeMs() + configChangeMsg.getNextMessageDelayMs();
|
| + }
|
| + return null; // Ignore all other messages in the envelope.
|
| + }
|
| +
|
| + lastKnownServerTimeMs = Math.max(lastKnownServerTimeMs, message.getHeader().getServerTimeMs());
|
| + return new ParsedMessage(message);
|
| + }
|
| +
|
| + /**
|
| + * Sends a message to the server to request a client token.
|
| + *
|
| + * @param applicationClientId application-specific client id
|
| + * @param nonce nonce for the request
|
| + * @param debugString information to identify the caller
|
| + */
|
| + void sendInitializeMessage(ApplicationClientIdP applicationClientId, Bytes nonce,
|
| + BatchingTask batchingTask, String debugString) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + if (applicationClientId.getClientType() != clientType) {
|
| + // This condition is not fatal, but it probably represents a bug somewhere if it occurs.
|
| + logger.warning(
|
| + "Client type in application id does not match constructor-provided type: %s vs %s",
|
| + applicationClientId, clientType);
|
| + }
|
| +
|
| + // Simply store the message in pendingInitializeMessage and send it when the batching task runs.
|
| + InitializeMessage initializeMsg = InitializeMessage.create(clientType, nonce,
|
| + applicationClientId, DigestSerializationType.BYTE_BASED);
|
| + batcher.setInitializeMessage(initializeMsg);
|
| + logger.info("Batching initialize message for client: %s, %s", debugString, initializeMsg);
|
| + batchingTask.ensureScheduled(debugString);
|
| + }
|
| +
|
| + /**
|
| + * Sends an info message to the server with the performance counters supplied
|
| + * in {@code performanceCounters} and the config supplies in
|
| + * {@code configParams}.
|
| + *
|
| + * @param requestServerRegistrationSummary indicates whether to request the
|
| + * server's registration summary
|
| + */
|
| + void sendInfoMessage(List<SimplePair<String, Integer>> performanceCounters,
|
| + ClientConfigP clientConfig, boolean requestServerRegistrationSummary,
|
| + BatchingTask batchingTask) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| +
|
| + List<PropertyRecord> performanceCounterRecords =
|
| + new ArrayList<PropertyRecord>(performanceCounters.size());
|
| + for (SimplePair<String, Integer> counter : performanceCounters) {
|
| + performanceCounterRecords.add(PropertyRecord.create(counter.first, counter.second));
|
| + }
|
| + InfoMessage infoMessage = InfoMessage.create(clientVersion, /* configParameter */ null,
|
| + performanceCounterRecords, requestServerRegistrationSummary, clientConfig);
|
| +
|
| + // Simply store the message in pendingInfoMessage and send it when the batching task runs.
|
| + batcher.setInfoMessage(infoMessage);
|
| + batchingTask.ensureScheduled("Send-info");
|
| + }
|
| +
|
| + /**
|
| + * Sends a registration request to the server.
|
| + *
|
| + * @param objectIds object ids on which to (un)register
|
| + * @param regOpType whether to register or unregister
|
| + */
|
| + void sendRegistrations(Collection<ObjectIdP> objectIds, Integer regOpType,
|
| + BatchingTask batchingTask) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + for (ObjectIdP objectId : objectIds) {
|
| + batcher.addRegistration(objectId, regOpType);
|
| + }
|
| + batchingTask.ensureScheduled("Send-registrations");
|
| + }
|
| +
|
| + /** Sends an acknowledgement for {@code invalidation} to the server. */
|
| + void sendInvalidationAck(InvalidationP invalidation, BatchingTask batchingTask) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + // We could summarize acks when there are suppressing invalidations - we don't since it is
|
| + // unlikely to be too beneficial here.
|
| + logger.fine("Sending ack for invalidation %s", invalidation);
|
| + batcher.addAck(invalidation);
|
| + batchingTask.ensureScheduled("Send-Ack");
|
| + }
|
| +
|
| + /**
|
| + * Sends a single registration subtree to the server.
|
| + *
|
| + * @param regSubtree subtree to send
|
| + */
|
| + void sendRegistrationSyncSubtree(RegistrationSubtree regSubtree, BatchingTask batchingTask) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + batcher.addRegSubtree(regSubtree);
|
| + logger.info("Adding subtree: %s", regSubtree);
|
| + batchingTask.ensureScheduled("Send-reg-sync");
|
| + }
|
| +
|
| + /** Sends pending data to the server (e.g., registrations, acks, registration sync messages). */
|
| + void sendMessageToServer() {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + if (nextMessageSendTimeMs > internalScheduler.getCurrentTimeMs()) {
|
| + logger.warning("In quiet period: not sending message to server: %s > %s",
|
| + nextMessageSendTimeMs, internalScheduler.getCurrentTimeMs());
|
| + return;
|
| + }
|
| +
|
| + // Create the message from the batcher.
|
| + ClientToServerMessage message;
|
| + try {
|
| + message = batcher.toMessage(createClientHeader(), listener.getClientToken() != null);
|
| + if (message == null) {
|
| + // Happens when we don't have a token and are not sending an initialize message. Logged
|
| + // in batcher.toMessage().
|
| + return;
|
| + }
|
| + } catch (ProtoWrapper.ValidationArgumentException exception) {
|
| + logger.severe("Tried to send invalid message: %s", batcher);
|
| + statistics.recordError(ClientErrorType.OUTGOING_MESSAGE_FAILURE);
|
| + return;
|
| + }
|
| + ++messageId;
|
| +
|
| + statistics.recordSentMessage(SentMessageType.TOTAL);
|
| + logger.fine("Sending message to server: %s", message);
|
| + network.sendMessage(message.toByteArray());
|
| +
|
| + // Record that the message was sent. We're invoking the listener directly, rather than
|
| + // scheduling a new work unit to do it. It would be safer to do a schedule, but that's hard to
|
| + // do in Android, we wrote this listener (it's InvalidationClientCore, so we know what it does),
|
| + // and it's the last line of this function.
|
| + listener.handleMessageSent();
|
| + }
|
| +
|
| + /** Returns the header to include on a message to the server. */
|
| + private ClientHeader createClientHeader() {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + return ClientHeader.create(ClientConstants.PROTOCOL_VERSION,
|
| + listener.getClientToken(), listener.getRegistrationSummary(),
|
| + internalScheduler.getCurrentTimeMs(), lastKnownServerTimeMs, Integer.toString(messageId),
|
| + clientType);
|
| + }
|
| +
|
| + @Override
|
| + public ProtocolHandlerState marshal() {
|
| + return ProtocolHandlerState.create(messageId, lastKnownServerTimeMs, nextMessageSendTimeMs,
|
| + batcher.marshal());
|
| + }
|
| +}
|
|
|