Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1826)

Unified Diff: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java

Issue 1162033004: Pull cacheinvalidations code directory into chromium repo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java
diff --git a/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..131de39df883e1a4f1b15539876d62c12c90d18d
--- /dev/null
+++ b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java
@@ -0,0 +1,661 @@
+/*
+ * Copyright 2011 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.ipc.invalidation.ticl;
+
+import com.google.ipc.invalidation.external.client.SystemResources;
+import com.google.ipc.invalidation.external.client.SystemResources.Logger;
+import com.google.ipc.invalidation.external.client.SystemResources.NetworkChannel;
+import com.google.ipc.invalidation.external.client.SystemResources.Scheduler;
+import com.google.ipc.invalidation.external.client.types.SimplePair;
+import com.google.ipc.invalidation.ticl.InvalidationClientCore.BatchingTask;
+import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
+import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
+import com.google.ipc.invalidation.ticl.Statistics.SentMessageType;
+import com.google.ipc.invalidation.ticl.proto.ClientConstants;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientHeader;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientToServerMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientVersion;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ConfigChangeMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage.DigestSerializationType;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.PropertyRecord;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RateLimitP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpType;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatusMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncRequestMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerHeader;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerToClientMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.TokenControlMessage;
+import com.google.ipc.invalidation.ticl.proto.CommonProtos;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.BatcherState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
+import com.google.ipc.invalidation.util.Bytes;
+import com.google.ipc.invalidation.util.InternalBase;
+import com.google.ipc.invalidation.util.Marshallable;
+import com.google.ipc.invalidation.util.Preconditions;
+import com.google.ipc.invalidation.util.ProtoWrapper;
+import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
+import com.google.ipc.invalidation.util.Smearer;
+import com.google.ipc.invalidation.util.TextBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * A layer for interacting with low-level protocol messages. Parses messages from the server and
+ * calls appropriate functions on the {@code ProtocolListener} to handle various types of message
+ * content. Also buffers message data from the client and constructs and sends messages to the
+ * server.
+ * <p>
+ * This class implements {@link Marshallable}, so its state can be written to a protocol buffer,
+ * and instances can be restored from such protocol buffers. Additionally, the nested class
+ * {@link Batcher} also implements {@code Marshallable} for the same reason.
+ * <p>
+ * Note that while we talk about "marshalling," in this context we mean marshalling to protocol
+ * buffers, not raw bytes.
+ *
+ */
+class ProtocolHandler implements Marshallable<ProtocolHandlerState> {
+ /** Class that batches messages to the server. */
+ private static class Batcher implements Marshallable<BatcherState> {
+ /** Statistics to be updated when messages are created. */
+ private final Statistics statistics;
+
+ /** Resources used for logging and thread assertions. */
+ private final SystemResources resources;
+
+ /** Set of pending registrations stored as a map for overriding later operations. */
+ private final Map<ObjectIdP, Integer> pendingRegistrations = new HashMap<ObjectIdP, Integer>();
+
+ /** Set of pending invalidation acks. */
+ private final Set<InvalidationP> pendingAckedInvalidations = new HashSet<InvalidationP>();
+
+ /** Set of pending registration sub trees for registration sync. */
+ private final Set<RegistrationSubtree> pendingRegSubtrees = new HashSet<RegistrationSubtree>();
+
+ /** Pending initialization message to send to the server, if any. */
+ private InitializeMessage pendingInitializeMessage = null;
+
+ /** Pending info message to send to the server, if any. */
+ private InfoMessage pendingInfoMessage = null;
+
+ /** Creates a batcher. */
+ Batcher(SystemResources resources, Statistics statistics) {
+ this.resources = resources;
+ this.statistics = statistics;
+ }
+
+ /** Creates a batcher from {@code marshalledState}. */
+ Batcher(SystemResources resources, Statistics statistics, BatcherState marshalledState) {
+ this(resources, statistics);
+ for (ObjectIdP registration : marshalledState.getRegistration()) {
+ pendingRegistrations.put(registration, RegistrationP.OpType.REGISTER);
+ }
+ for (ObjectIdP unregistration : marshalledState.getUnregistration()) {
+ pendingRegistrations.put(unregistration, RegistrationP.OpType.UNREGISTER);
+ }
+ for (InvalidationP ack : marshalledState.getAcknowledgement()) {
+ pendingAckedInvalidations.add(ack);
+ }
+ for (RegistrationSubtree subtree : marshalledState.getRegistrationSubtree()) {
+ pendingRegSubtrees.add(subtree);
+ }
+ pendingInitializeMessage = marshalledState.getNullableInitializeMessage();
+ if (marshalledState.hasInfoMessage()) {
+ pendingInfoMessage = marshalledState.getInfoMessage();
+ }
+ }
+
+ /** Sets the initialize message to be sent. */
+ void setInitializeMessage(InitializeMessage msg) {
+ pendingInitializeMessage = msg;
+ }
+
+ /** Sets the info message to be sent. */
+ void setInfoMessage(InfoMessage msg) {
+ pendingInfoMessage = msg;
+ }
+
+ /** Adds a registration on {@code oid} of {@code opType} to the registrations to be sent. */
+ void addRegistration(ObjectIdP oid, Integer opType) {
+ pendingRegistrations.put(oid, opType);
+ }
+
+ /** Adds {@code ack} to the set of acknowledgements to be sent. */
+ void addAck(InvalidationP ack) {
+ pendingAckedInvalidations.add(ack);
+ }
+
+ /** Adds {@code subtree} to the set of registration subtrees to be sent. */
+ void addRegSubtree(RegistrationSubtree subtree) {
+ pendingRegSubtrees.add(subtree);
+ }
+
+ /**
+ * Returns a builder for a {@link ClientToServerMessage} to be sent to the server. Crucially,
+ * the builder does <b>NOT</b> include the message header.
+ * @param hasClientToken whether the client currently holds a token
+ */
+ ClientToServerMessage toMessage(final ClientHeader header, boolean hasClientToken) {
+ final InitializeMessage initializeMessage;
+ final RegistrationMessage registrationMessage;
+ final RegistrationSyncMessage registrationSyncMessage;
+ final InvalidationMessage invalidationAckMessage;
+ final InfoMessage infoMessage;
+
+ if (pendingInitializeMessage != null) {
+ statistics.recordSentMessage(SentMessageType.INITIALIZE);
+ initializeMessage = pendingInitializeMessage;
+ pendingInitializeMessage = null;
+ } else {
+ initializeMessage = null;
+ }
+
+ // Note: Even if an initialize message is being sent, we can send additional
+ // messages such as regisration messages, etc to the server. But if there is no token
+ // and an initialize message is not being sent, we cannot send any other message.
+
+ if (!hasClientToken && (initializeMessage == null)) {
+ // Cannot send any message
+ resources.getLogger().warning(
+ "Cannot send message since no token and no initialize msg");
+ statistics.recordError(ClientErrorType.TOKEN_MISSING_FAILURE);
+ return null;
+ }
+
+ // Check for pending batched operations and add to message builder if needed.
+
+ // Add reg, acks, reg subtrees - clear them after adding.
+ if (!pendingAckedInvalidations.isEmpty()) {
+ invalidationAckMessage = createInvalidationAckMessage();
+ statistics.recordSentMessage(SentMessageType.INVALIDATION_ACK);
+ } else {
+ invalidationAckMessage = null;
+ }
+
+ // Check regs.
+ if (!pendingRegistrations.isEmpty()) {
+ registrationMessage = createRegistrationMessage();
+ statistics.recordSentMessage(SentMessageType.REGISTRATION);
+ } else {
+ registrationMessage = null;
+ }
+
+ // Check reg substrees.
+ if (!pendingRegSubtrees.isEmpty()) {
+ // If there are multiple pending reg subtrees, only one is sent.
+ ArrayList<RegistrationSubtree> regSubtrees = new ArrayList<RegistrationSubtree>(1);
+ regSubtrees.add(pendingRegSubtrees.iterator().next());
+ registrationSyncMessage = RegistrationSyncMessage.create(regSubtrees);
+ pendingRegSubtrees.clear();
+ statistics.recordSentMessage(SentMessageType.REGISTRATION_SYNC);
+ } else {
+ registrationSyncMessage = null;
+ }
+
+ // Check if an info message has to be sent.
+ if (pendingInfoMessage != null) {
+ statistics.recordSentMessage(SentMessageType.INFO);
+ infoMessage = pendingInfoMessage;
+ pendingInfoMessage = null;
+ } else {
+ infoMessage = null;
+ }
+
+ return ClientToServerMessage.create(header, initializeMessage, registrationMessage,
+ registrationSyncMessage, invalidationAckMessage, infoMessage);
+ }
+
+ /**
+ * Creates a registration message based on registrations from {@code pendingRegistrations}
+ * and returns it.
+ * <p>
+ * REQUIRES: pendingRegistrations.size() > 0
+ */
+ private RegistrationMessage createRegistrationMessage() {
+ Preconditions.checkState(!pendingRegistrations.isEmpty());
+
+ // Run through the pendingRegistrations map.
+ List<RegistrationP> pendingRegistrations =
+ new ArrayList<RegistrationP>(this.pendingRegistrations.size());
+ for (Map.Entry<ObjectIdP, Integer> entry : this.pendingRegistrations.entrySet()) {
+ pendingRegistrations.add(RegistrationP.create(entry.getKey(), entry.getValue()));
+ }
+ this.pendingRegistrations.clear();
+ return RegistrationMessage.create(pendingRegistrations);
+ }
+
+ /**
+ * Creates an invalidation ack message based on acks from {@code pendingAckedInvalidations} and
+ * returns it.
+ * <p>
+ * REQUIRES: pendingAckedInvalidations.size() > 0
+ */
+ private InvalidationMessage createInvalidationAckMessage() {
+ Preconditions.checkState(!pendingAckedInvalidations.isEmpty());
+ InvalidationMessage ackMessage =
+ InvalidationMessage.create(new ArrayList<InvalidationP>(pendingAckedInvalidations));
+ pendingAckedInvalidations.clear();
+ return ackMessage;
+ }
+
+ @Override
+ public BatcherState marshal() {
+ // Marshall (un)registrations.
+ ArrayList<ObjectIdP> registrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
+ ArrayList<ObjectIdP> unregistrations = new ArrayList<ObjectIdP>(pendingRegistrations.size());
+ for (Map.Entry<ObjectIdP, Integer> entry : pendingRegistrations.entrySet()) {
+ Integer opType = entry.getValue();
+ ObjectIdP oid = entry.getKey();
+ new ArrayList<ObjectIdP>(pendingRegistrations.size());
+ switch (opType) {
+ case OpType.REGISTER:
+ registrations.add(oid);
+ break;
+ case OpType.UNREGISTER:
+ unregistrations.add(oid);
+ break;
+ default:
+ throw new IllegalArgumentException(opType.toString());
+ }
+ }
+ return BatcherState.create(registrations, unregistrations, pendingAckedInvalidations,
+ pendingRegSubtrees, pendingInitializeMessage, pendingInfoMessage);
+ }
+ }
+
+ /** Representation of a message header for use in a server message. */
+ static class ServerMessageHeader extends InternalBase {
+ /**
+ * Constructs an instance.
+ *
+ * @param token server-sent token
+ * @param registrationSummary summary over server registration state
+ */
+ ServerMessageHeader(Bytes token, RegistrationSummary registrationSummary) {
+ this.token = token;
+ this.registrationSummary = registrationSummary;
+ }
+
+ /** Server-sent token. */
+ Bytes token;
+
+ /** Summary of the client's registration state at the server. */
+ RegistrationSummary registrationSummary;
+
+ @Override
+ public void toCompactString(TextBuilder builder) {
+ builder.appendFormat("Token: %s, Summary: %s", token, registrationSummary);
+ }
+ }
+
+ /**
+ * Representation of a message receiver for the server. Such a message is guaranteed to be
+ * valid, but the session token is <b>not</b> checked.
+ */
+ static class ParsedMessage {
+ /*
+ * Each of these fields corresponds directly to a field in the ServerToClientMessage protobuf.
+ * It is non-null iff the corresponding hasYYY method in the protobuf would return true.
+ */
+ final ServerMessageHeader header;
+ final TokenControlMessage tokenControlMessage;
+ final InvalidationMessage invalidationMessage;
+ final RegistrationStatusMessage registrationStatusMessage;
+ final RegistrationSyncRequestMessage registrationSyncRequestMessage;
+ final ConfigChangeMessage configChangeMessage;
+ final InfoRequestMessage infoRequestMessage;
+ final ErrorMessage errorMessage;
+
+ /** Constructs an instance from a {@code rawMessage}. */
+ ParsedMessage(ServerToClientMessage rawMessage) {
+ // For each field, assign it to the corresponding protobuf field if present, else null.
+ ServerHeader messageHeader = rawMessage.getHeader();
+ header = new ServerMessageHeader(messageHeader.getClientToken(),
+ messageHeader.getNullableRegistrationSummary());
+ tokenControlMessage =
+ rawMessage.hasTokenControlMessage() ? rawMessage.getTokenControlMessage() : null;
+ invalidationMessage = rawMessage.getNullableInvalidationMessage();
+ registrationStatusMessage = rawMessage.getNullableRegistrationStatusMessage();
+ registrationSyncRequestMessage = rawMessage.hasRegistrationSyncRequestMessage()
+ ? rawMessage.getRegistrationSyncRequestMessage() : null;
+ configChangeMessage =
+ rawMessage.hasConfigChangeMessage() ? rawMessage.getConfigChangeMessage() : null;
+ infoRequestMessage = rawMessage.getNullableInfoRequestMessage();
+ errorMessage = rawMessage.getNullableErrorMessage();
+ }
+ }
+
+ /**
+ * Listener for protocol events. The handler guarantees that the call will be made on the internal
+ * thread that the SystemResources provides.
+ */
+ interface ProtocolListener {
+ /** Records that a message was sent to the server at the current time. */
+ void handleMessageSent();
+
+ /** Returns a summary of the current desired registrations. */
+ RegistrationSummary getRegistrationSummary();
+
+ /** Returns the current server-assigned client token, if any. */
+ Bytes getClientToken();
+ }
+
+ /** Information about the client, e.g., application name, OS, etc. */
+ private final ClientVersion clientVersion;
+
+ /** A logger. */
+ private final Logger logger;
+
+ /** Scheduler for the client's internal processing. */
+ private final Scheduler internalScheduler;
+
+ /** Network channel for sending and receiving messages to and from the server. */
+ private final NetworkChannel network;
+
+ /** The protocol listener. */
+ private final ProtocolListener listener;
+
+ /** Batches messages to the server. */
+ private final Batcher batcher;
+
+ /** A debug message id that is added to every message to the server. */
+ private int messageId = 1;
+
+ // State specific to a client. If we want to support multiple clients, this could
+ // be in a map or could be eliminated (e.g., no batching).
+
+ /** The last known time from the server. */
+ private long lastKnownServerTimeMs = 0;
+
+ /**
+ * The next time before which a message cannot be sent to the server. If this is less than current
+ * time, a message can be sent at any time.
+ */
+ private long nextMessageSendTimeMs = 0;
+
+ /** Statistics objects to track number of sent messages, etc. */
+ private final Statistics statistics;
+
+ /** Client type for inclusion in headers. */
+ private final int clientType;
+
+ /**
+ * Creates an instance.
+ *
+ * @param config configuration for the client
+ * @param resources resources to use
+ * @param smearer a smearer to randomize delays
+ * @param statistics track information about messages sent/received, etc
+ * @param applicationName name of the application using the library (for debugging/monitoring)
+ * @param listener callback for protocol events
+ */
+ ProtocolHandler(ProtocolHandlerConfigP config, final SystemResources resources,
+ Smearer smearer, Statistics statistics, int clientType, String applicationName,
+ ProtocolListener listener, ProtocolHandlerState marshalledState) {
+ this.logger = resources.getLogger();
+ this.statistics = statistics;
+ this.internalScheduler = resources.getInternalScheduler();
+ this.network = resources.getNetwork();
+ this.listener = listener;
+ this.clientVersion = CommonProtos.newClientVersion(resources.getPlatform(), "Java",
+ applicationName);
+ this.clientType = clientType;
+ if (marshalledState == null) {
+ // If there is no marshalled state, construct a clean batcher.
+ this.batcher = new Batcher(resources, statistics);
+ } else {
+ // Otherwise, restore the batcher from the marshalled state.
+ this.batcher = new Batcher(resources, statistics, marshalledState.getBatcherState());
+ this.messageId = marshalledState.getMessageId();
+ this.lastKnownServerTimeMs = marshalledState.getLastKnownServerTimeMs();
+ this.nextMessageSendTimeMs = marshalledState.getNextMessageSendTimeMs();
+ }
+ logger.info("Created protocol handler for application %s, platform %s", applicationName,
+ resources.getPlatform());
+ }
+
+ /** Returns a default config for the protocol handler. */
+ static ProtocolHandlerConfigP createConfig() {
+ // Allow at most 3 messages every 5 seconds.
+ int windowMs = 5 * 1000;
+ int numMessagesPerWindow = 3;
+
+ List<RateLimitP> rateLimits = new ArrayList<RateLimitP>();
+ rateLimits.add(RateLimitP.create(windowMs, numMessagesPerWindow));
+ return ProtocolHandlerConfigP.create(null, rateLimits);
+ }
+
+ /** Returns a configuration object with parameters set for unit tests. */
+ static ProtocolHandlerConfigP createConfigForTest() {
+ // No rate limits
+ int smallBatchDelayForTest = 200;
+ return ProtocolHandlerConfigP.create(smallBatchDelayForTest, new ArrayList<RateLimitP>(0));
+ }
+
+ /**
+ * Returns the next time a message is allowed to be sent to the server. Typically, this will be
+ * in the past, meaning that the client is free to send a message at any time.
+ */
+ public long getNextMessageSendTimeMsForTest() {
+ return nextMessageSendTimeMs;
+ }
+
+ /**
+ * Handles a message from the server. If the message can be processed (i.e., is valid, is
+ * of the right version, and is not a silence message), returns a {@link ParsedMessage}
+ * representing it. Otherwise, returns {@code null}.
+ * <p>
+ * This class intercepts and processes silence messages. In this case, it will discard any other
+ * data in the message.
+ * <p>
+ * Note that this method does <b>not</b> check the session token of any message.
+ */
+ ParsedMessage handleIncomingMessage(byte[] incomingMessage) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ ServerToClientMessage message;
+ try {
+ message = ServerToClientMessage.parseFrom(incomingMessage);
+ } catch (ValidationException exception) {
+ statistics.recordError(ClientErrorType.INCOMING_MESSAGE_FAILURE);
+ logger.warning("Incoming message is invalid: %s", Bytes.toLazyCompactString(incomingMessage));
+ return null;
+ }
+
+ // Check the version of the message.
+ if (message.getHeader().getProtocolVersion().getVersion().getMajorVersion() !=
+ ClientConstants.PROTOCOL_MAJOR_VERSION) {
+ statistics.recordError(ClientErrorType.PROTOCOL_VERSION_FAILURE);
+ logger.severe("Dropping message with incompatible version: %s", message);
+ return null;
+ }
+
+ // Check if it is a ConfigChangeMessage which indicates that messages should no longer be
+ // sent for a certain duration. Perform this check before the token is even checked.
+ if (message.hasConfigChangeMessage()) {
+ ConfigChangeMessage configChangeMsg = message.getConfigChangeMessage();
+ statistics.recordReceivedMessage(ReceivedMessageType.CONFIG_CHANGE);
+ if (configChangeMsg.hasNextMessageDelayMs()) { // Validator has ensured that it is positive.
+ nextMessageSendTimeMs =
+ internalScheduler.getCurrentTimeMs() + configChangeMsg.getNextMessageDelayMs();
+ }
+ return null; // Ignore all other messages in the envelope.
+ }
+
+ lastKnownServerTimeMs = Math.max(lastKnownServerTimeMs, message.getHeader().getServerTimeMs());
+ return new ParsedMessage(message);
+ }
+
+ /**
+ * Sends a message to the server to request a client token.
+ *
+ * @param applicationClientId application-specific client id
+ * @param nonce nonce for the request
+ * @param debugString information to identify the caller
+ */
+ void sendInitializeMessage(ApplicationClientIdP applicationClientId, Bytes nonce,
+ BatchingTask batchingTask, String debugString) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ if (applicationClientId.getClientType() != clientType) {
+ // This condition is not fatal, but it probably represents a bug somewhere if it occurs.
+ logger.warning(
+ "Client type in application id does not match constructor-provided type: %s vs %s",
+ applicationClientId, clientType);
+ }
+
+ // Simply store the message in pendingInitializeMessage and send it when the batching task runs.
+ InitializeMessage initializeMsg = InitializeMessage.create(clientType, nonce,
+ applicationClientId, DigestSerializationType.BYTE_BASED);
+ batcher.setInitializeMessage(initializeMsg);
+ logger.info("Batching initialize message for client: %s, %s", debugString, initializeMsg);
+ batchingTask.ensureScheduled(debugString);
+ }
+
+ /**
+ * Sends an info message to the server with the performance counters supplied
+ * in {@code performanceCounters} and the config supplies in
+ * {@code configParams}.
+ *
+ * @param requestServerRegistrationSummary indicates whether to request the
+ * server's registration summary
+ */
+ void sendInfoMessage(List<SimplePair<String, Integer>> performanceCounters,
+ ClientConfigP clientConfig, boolean requestServerRegistrationSummary,
+ BatchingTask batchingTask) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+
+ List<PropertyRecord> performanceCounterRecords =
+ new ArrayList<PropertyRecord>(performanceCounters.size());
+ for (SimplePair<String, Integer> counter : performanceCounters) {
+ performanceCounterRecords.add(PropertyRecord.create(counter.first, counter.second));
+ }
+ InfoMessage infoMessage = InfoMessage.create(clientVersion, /* configParameter */ null,
+ performanceCounterRecords, requestServerRegistrationSummary, clientConfig);
+
+ // Simply store the message in pendingInfoMessage and send it when the batching task runs.
+ batcher.setInfoMessage(infoMessage);
+ batchingTask.ensureScheduled("Send-info");
+ }
+
+ /**
+ * Sends a registration request to the server.
+ *
+ * @param objectIds object ids on which to (un)register
+ * @param regOpType whether to register or unregister
+ */
+ void sendRegistrations(Collection<ObjectIdP> objectIds, Integer regOpType,
+ BatchingTask batchingTask) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ for (ObjectIdP objectId : objectIds) {
+ batcher.addRegistration(objectId, regOpType);
+ }
+ batchingTask.ensureScheduled("Send-registrations");
+ }
+
+ /** Sends an acknowledgement for {@code invalidation} to the server. */
+ void sendInvalidationAck(InvalidationP invalidation, BatchingTask batchingTask) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ // We could summarize acks when there are suppressing invalidations - we don't since it is
+ // unlikely to be too beneficial here.
+ logger.fine("Sending ack for invalidation %s", invalidation);
+ batcher.addAck(invalidation);
+ batchingTask.ensureScheduled("Send-Ack");
+ }
+
+ /**
+ * Sends a single registration subtree to the server.
+ *
+ * @param regSubtree subtree to send
+ */
+ void sendRegistrationSyncSubtree(RegistrationSubtree regSubtree, BatchingTask batchingTask) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ batcher.addRegSubtree(regSubtree);
+ logger.info("Adding subtree: %s", regSubtree);
+ batchingTask.ensureScheduled("Send-reg-sync");
+ }
+
+ /** Sends pending data to the server (e.g., registrations, acks, registration sync messages). */
+ void sendMessageToServer() {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ if (nextMessageSendTimeMs > internalScheduler.getCurrentTimeMs()) {
+ logger.warning("In quiet period: not sending message to server: %s > %s",
+ nextMessageSendTimeMs, internalScheduler.getCurrentTimeMs());
+ return;
+ }
+
+ // Create the message from the batcher.
+ ClientToServerMessage message;
+ try {
+ message = batcher.toMessage(createClientHeader(), listener.getClientToken() != null);
+ if (message == null) {
+ // Happens when we don't have a token and are not sending an initialize message. Logged
+ // in batcher.toMessage().
+ return;
+ }
+ } catch (ProtoWrapper.ValidationArgumentException exception) {
+ logger.severe("Tried to send invalid message: %s", batcher);
+ statistics.recordError(ClientErrorType.OUTGOING_MESSAGE_FAILURE);
+ return;
+ }
+ ++messageId;
+
+ statistics.recordSentMessage(SentMessageType.TOTAL);
+ logger.fine("Sending message to server: %s", message);
+ network.sendMessage(message.toByteArray());
+
+ // Record that the message was sent. We're invoking the listener directly, rather than
+ // scheduling a new work unit to do it. It would be safer to do a schedule, but that's hard to
+ // do in Android, we wrote this listener (it's InvalidationClientCore, so we know what it does),
+ // and it's the last line of this function.
+ listener.handleMessageSent();
+ }
+
+ /** Returns the header to include on a message to the server. */
+ private ClientHeader createClientHeader() {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ return ClientHeader.create(ClientConstants.PROTOCOL_VERSION,
+ listener.getClientToken(), listener.getRegistrationSummary(),
+ internalScheduler.getCurrentTimeMs(), lastKnownServerTimeMs, Integer.toString(messageId),
+ clientType);
+ }
+
+ @Override
+ public ProtocolHandlerState marshal() {
+ return ProtocolHandlerState.create(messageId, lastKnownServerTimeMs, nextMessageSendTimeMs,
+ batcher.marshal());
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698