Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1234)

Unified Diff: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java

Issue 1162033004: Pull cacheinvalidations code directory into chromium repo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java
diff --git a/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java
new file mode 100644
index 0000000000000000000000000000000000000000..1eca3d4c71ce05fd93fd60fbbe7c2b7e5cc2e5d7
--- /dev/null
+++ b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java
@@ -0,0 +1,1546 @@
+/*
+ * Copyright 2011 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.ipc.invalidation.ticl;
+
+import static com.google.ipc.invalidation.external.client.SystemResources.Scheduler.NO_DELAY;
+
+import com.google.ipc.invalidation.common.DigestFunction;
+import com.google.ipc.invalidation.common.ObjectIdDigestUtils;
+import com.google.ipc.invalidation.external.client.InvalidationListener;
+import com.google.ipc.invalidation.external.client.InvalidationListener.RegistrationState;
+import com.google.ipc.invalidation.external.client.SystemResources;
+import com.google.ipc.invalidation.external.client.SystemResources.Logger;
+import com.google.ipc.invalidation.external.client.SystemResources.NetworkChannel;
+import com.google.ipc.invalidation.external.client.SystemResources.Scheduler;
+import com.google.ipc.invalidation.external.client.SystemResources.Storage;
+import com.google.ipc.invalidation.external.client.types.AckHandle;
+import com.google.ipc.invalidation.external.client.types.Callback;
+import com.google.ipc.invalidation.external.client.types.ErrorInfo;
+import com.google.ipc.invalidation.external.client.types.Invalidation;
+import com.google.ipc.invalidation.external.client.types.ObjectId;
+import com.google.ipc.invalidation.external.client.types.SimplePair;
+import com.google.ipc.invalidation.external.client.types.Status;
+import com.google.ipc.invalidation.ticl.ProtocolHandler.ParsedMessage;
+import com.google.ipc.invalidation.ticl.ProtocolHandler.ProtocolListener;
+import com.google.ipc.invalidation.ticl.ProtocolHandler.ServerMessageHeader;
+import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
+import com.google.ipc.invalidation.ticl.Statistics.IncomingOperationType;
+import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
+import com.google.ipc.invalidation.ticl.proto.ChannelCommon.NetworkEndpointId;
+import com.google.ipc.invalidation.ticl.proto.Client.AckHandleP;
+import com.google.ipc.invalidation.ticl.proto.Client.ExponentialBackoffState;
+import com.google.ipc.invalidation.ticl.proto.Client.PersistentTiclState;
+import com.google.ipc.invalidation.ticl.proto.Client.RunStateP;
+import com.google.ipc.invalidation.ticl.proto.ClientConstants;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage.InfoType;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConfigP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatus;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
+import com.google.ipc.invalidation.ticl.proto.ClientProtocol.Version;
+import com.google.ipc.invalidation.ticl.proto.CommonProtos;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.InvalidationClientState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.RecurringTaskState;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.RegistrationManagerStateP;
+import com.google.ipc.invalidation.ticl.proto.JavaClient.StatisticsState;
+import com.google.ipc.invalidation.util.Box;
+import com.google.ipc.invalidation.util.Bytes;
+import com.google.ipc.invalidation.util.InternalBase;
+import com.google.ipc.invalidation.util.Marshallable;
+import com.google.ipc.invalidation.util.Preconditions;
+import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
+import com.google.ipc.invalidation.util.Smearer;
+import com.google.ipc.invalidation.util.TextBuilder;
+import com.google.ipc.invalidation.util.TypedUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.logging.Level;
+
+
+/**
+ * Core implementation of the Invalidation Client Library (Ticl). Subclasses are required
+ * to implement concurrency control for the Ticl.
+ *
+ */
+public abstract class InvalidationClientCore extends InternalBase
+ implements Marshallable<InvalidationClientState>, ProtocolListener,
+ TestableInvalidationClient {
+
+ /**
+ * A subclass of {@link RecurringTask} with simplified constructors to provide common
+ * parameters automatically (scheduler, logger, smearer).
+ */
+ private abstract class TiclRecurringTask extends RecurringTask {
+ /**
+ * Constructs a task with {@code initialDelayMs} and {@code timeoutDelayMs}. If
+ * {@code useExponentialBackoff}, an exponential backoff generator with initial delay
+ * {@code timeoutDelayMs} is used as well; if not, exponential backoff is not used.
+ */
+ TiclRecurringTask(String name, int initialDelayMs, int timeoutDelayMs,
+ boolean useExponentialBackoff) {
+ super(name, internalScheduler, logger, smearer,
+ useExponentialBackoff ? createExpBackOffGenerator(timeoutDelayMs, null) : null,
+ initialDelayMs, timeoutDelayMs);
+ }
+
+ /**
+ * Constructs an instance from {@code marshalledState} that does not use exponential backoff.
+ * @param name name of the recurring task
+ */
+ private TiclRecurringTask(String name, RecurringTaskState marshalledState) {
+ super(name, internalScheduler, logger, smearer, null, marshalledState);
+ }
+
+ /**
+ * Constructs an instance from {@code marshalledState} that uses exponential backoff with an
+ * initial backoff of {@code timeoutMs}.
+ *
+ * @param name name of the recurring task
+ */
+ private TiclRecurringTask(String name, int timeoutMs, RecurringTaskState marshalledState) {
+ super(name, internalScheduler, logger, smearer,
+ createExpBackOffGenerator(timeoutMs, marshalledState.getBackoffState()), marshalledState);
+ }
+ }
+
+ /** A task for acquiring tokens from the server. */
+ private class AcquireTokenTask extends TiclRecurringTask {
+ private static final String TASK_NAME = "AcquireToken";
+
+ AcquireTokenTask() {
+ super(TASK_NAME, NO_DELAY, config.getNetworkTimeoutDelayMs(), true);
+ }
+
+ AcquireTokenTask(RecurringTaskState marshalledState) {
+ super(TASK_NAME, config.getNetworkTimeoutDelayMs(), marshalledState);
+ }
+
+ @Override
+ public boolean runTask() {
+ // If token is still not assigned (as expected), sends a request. Otherwise, ignore.
+ if (clientToken == null) {
+ // Allocate a nonce and send a message requesting a new token.
+ setNonce(generateNonce(random));
+ protocolHandler.sendInitializeMessage(applicationClientId, nonce, batchingTask, TASK_NAME);
+ return true; // Reschedule to check state, retry if necessary after timeout.
+ } else {
+ return false; // Don't reschedule.
+ }
+ }
+ }
+
+ /**
+ * A task that schedules heartbeats when the registration summary at the client is not
+ * in sync with the registration summary from the server.
+ */
+ private class RegSyncHeartbeatTask extends TiclRecurringTask {
+ private static final String TASK_NAME = "RegSyncHeartbeat";
+
+ RegSyncHeartbeatTask() {
+ super(TASK_NAME, config.getNetworkTimeoutDelayMs(), config.getNetworkTimeoutDelayMs(), true);
+ }
+
+ RegSyncHeartbeatTask(RecurringTaskState marshalledState) {
+ super(TASK_NAME, config.getNetworkTimeoutDelayMs(), marshalledState);
+ }
+
+ @Override
+ public boolean runTask() {
+ if (!registrationManager.isStateInSyncWithServer()) {
+ // Simply send an info message to ensure syncing happens.
+ logger.info("Registration state not in sync with server: %s", registrationManager);
+ sendInfoMessageToServer(false, true /* request server summary */);
+ return true;
+ } else {
+ logger.info("Not sending message since state is now in sync");
+ return false;
+ }
+ }
+ }
+
+ /** A task that writes the token to persistent storage. */
+ private class PersistentWriteTask extends TiclRecurringTask {
+ /*
+ * This class implements a "train" of events that attempt to reliably write state to
+ * storage. The train continues until runTask encounters a termination condition, in
+ * which the state currently in memory and the state currently in storage match.
+ */
+
+ private static final String TASK_NAME = "PersistentWrite";
+
+ /** The last client token that was written to to persistent state successfully. */
+ private final Box<PersistentTiclState> lastWrittenState =
+ Box.of(PersistentTiclState.DEFAULT_INSTANCE);
+
+ PersistentWriteTask() {
+ super(TASK_NAME, NO_DELAY, config.getWriteRetryDelayMs(), true);
+ }
+
+ PersistentWriteTask(RecurringTaskState marshalledState) {
+ super(TASK_NAME, config.getWriteRetryDelayMs(), marshalledState);
+ }
+
+ @Override
+ public boolean runTask() {
+ if (clientToken == null) {
+ // We cannot write without a token. We must do this check before creating the
+ // PersistentTiclState because newPersistentTiclState cannot handle null tokens.
+ return false;
+ }
+
+ // Compute the state that we will write if we decide to go ahead with the write.
+ final PersistentTiclState state =
+ PersistentTiclState.create(clientToken, lastMessageSendTimeMs);
+ byte[] serializedState = PersistenceUtils.serializeState(state, digestFn);
+
+ // Decide whether or not to do the write. The decision varies depending on whether or
+ // not the channel supports offline delivery. If we decide not to do the write, then
+ // that means the in-memory and stored state match semantically, and the train stops.
+ if (config.getChannelSupportsOfflineDelivery()) {
+ // For offline delivery, we want the entire state to match, since we write the last
+ // send time for every message.
+ if (state.equals(lastWrittenState.get())) {
+ return false;
+ }
+ } else {
+ // If we do not support offline delivery, we avoid writing the state on each message, and
+ // we avoid checking the last-sent time (we check only the client token).
+ if (TypedUtil.<Bytes>equals(
+ state.getClientToken(), lastWrittenState.get().getClientToken())) {
+ return false;
+ }
+ }
+
+ // We decided to do the write.
+ storage.writeKey(CLIENT_TOKEN_KEY, serializedState, new Callback<Status>() {
+ @Override
+ public void accept(Status status) {
+ logger.info("Write state completed: %s for %s", status, state);
+ Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
+ if (status.isSuccess()) {
+ // Set lastWrittenToken to be the token that was written (NOT clientToken - which
+ // could have changed while the write was happening).
+ lastWrittenState.set(state);
+ } else {
+ statistics.recordError(ClientErrorType.PERSISTENT_WRITE_FAILURE);
+ }
+ }
+ });
+ return true; // Reschedule after timeout to make sure that write does happen.
+ }
+ }
+
+ /** A task for sending heartbeats to the server. */
+ private class HeartbeatTask extends TiclRecurringTask {
+ private static final String TASK_NAME = "Heartbeat";
+
+ /** Next time that the performance counters are sent to the server. */
+ private long nextPerformanceSendTimeMs;
+
+ HeartbeatTask() {
+ super(TASK_NAME, config.getHeartbeatIntervalMs(), NO_DELAY, false);
+ }
+
+ HeartbeatTask(RecurringTaskState marshalledState) {
+ super(TASK_NAME, marshalledState);
+ }
+
+ @Override
+ public boolean runTask() {
+ // Send info message. If needed, send performance counters and reset the next performance
+ // counter send time.
+ logger.info("Sending heartbeat to server: %s", this);
+ boolean mustSendPerfCounters =
+ nextPerformanceSendTimeMs > internalScheduler.getCurrentTimeMs();
+ if (mustSendPerfCounters) {
+ this.nextPerformanceSendTimeMs = internalScheduler.getCurrentTimeMs() +
+ getSmearer().getSmearedDelay(config.getPerfCounterDelayMs());
+ }
+ sendInfoMessageToServer(mustSendPerfCounters, !registrationManager.isStateInSyncWithServer());
+ return true; // Reschedule.
+ }
+ }
+
+ /** The task that is scheduled to send batched messages to the server (when needed). **/
+ static class BatchingTask extends RecurringTask {
+ /*
+ * This class is static and extends RecurringTask directly so that it can be instantiated
+ * independently in ProtocolHandlerTest.
+ */
+ private static final String TASK_NAME = "Batching";
+
+ /** {@link ProtocolHandler} instance from which messages will be pulled. */
+ private final ProtocolHandler protocolHandler;
+
+ /** Creates a new instance with default state. */
+ BatchingTask(ProtocolHandler protocolHandler, SystemResources resources, Smearer smearer,
+ int batchingDelayMs) {
+ super(TASK_NAME, resources.getInternalScheduler(), resources.getLogger(), smearer, null,
+ batchingDelayMs, NO_DELAY);
+ this.protocolHandler = protocolHandler;
+ }
+
+ /** Creates a new instance with state from {@code marshalledState}. */
+ BatchingTask(ProtocolHandler protocolHandler, SystemResources resources, Smearer smearer,
+ RecurringTaskState marshalledState) {
+ super(TASK_NAME, resources.getInternalScheduler(), resources.getLogger(), smearer, null,
+ marshalledState);
+ this.protocolHandler = protocolHandler;
+ }
+
+ @Override
+ public boolean runTask() {
+ protocolHandler.sendMessageToServer();
+ return false; // Don't reschedule.
+ }
+ }
+
+ /**
+ * A (slightly strange) recurring task that executes exactly once for the first heartbeat
+ * performed by a Ticl restarting from persistent state. The Android Ticl implementation
+ * requires that all work to be scheduled in the future occur in the form of a recurring task,
+ * hence this class.
+ */
+ private class InitialPersistentHeartbeatTask extends TiclRecurringTask {
+ private static final String TASK_NAME = "InitialPersistentHeartbeat";
+
+ InitialPersistentHeartbeatTask(int delayMs) {
+ super(TASK_NAME, delayMs, NO_DELAY, false);
+ }
+
+ @Override
+ public boolean runTask() {
+ sendInfoMessageToServer(false, true);
+ return false; // Don't reschedule.
+ }
+ }
+
+ //
+ // End of nested classes.
+ //
+
+ /** The single key used to write all the Ticl state. */
+ public static final String CLIENT_TOKEN_KEY = "ClientToken";
+
+ /** Resources for the Ticl. */
+ private final SystemResources resources;
+
+ /**
+ * Reference into the resources object for cleaner code. All Ticl code must execute on this
+ * scheduler.
+ */
+ private final Scheduler internalScheduler;
+
+ /** Logger reference into the resources object for cleaner code. */
+ private final Logger logger;
+
+ /** Storage for the Ticl persistent state. */
+ Storage storage;
+
+ /** Application callback interface. */
+ final InvalidationListener listener;
+
+ /** Configuration for this instance. */
+ private ClientConfigP config;
+
+ /** Application identifier for this client. */
+ private final ApplicationClientIdP applicationClientId;
+
+ /** Object maintaining the registration state for this client. */
+ private final RegistrationManager registrationManager;
+
+ /** Object handling low-level wire format interactions. */
+ private final ProtocolHandler protocolHandler;
+
+ /** The function for computing the registration and persistence state digests. */
+ private final DigestFunction digestFn = new ObjectIdDigestUtils.Sha1DigestFunction();
+
+ /** The state of the Ticl whether it has started or not. */
+ private final RunState ticlState;
+
+ /** Statistics objects to track number of sent messages, etc. */
+ final Statistics statistics;
+
+ /** A smearer to make sure that delays are randomized a little bit. */
+ private final Smearer smearer;
+
+ /** Current client token known from the server. */
+ private Bytes clientToken = null;
+
+ // After the client starts, exactly one of nonce and clientToken is non-null.
+
+ /** If not {@code null}, nonce for pending identifier request. */
+ private Bytes nonce = null;
+
+ /** Whether we should send registrations to the server or not. */
+ private boolean shouldSendRegistrations;
+
+ /** Whether the network is online. Assume so when we start. */
+ private boolean isOnline = true;
+
+ /** A random number generator. */
+ private final Random random;
+
+ /** Last time a message was sent to the server. */
+ private long lastMessageSendTimeMs = 0;
+
+ /** A task for acquiring the token (if the client has no token). */
+ private AcquireTokenTask acquireTokenTask;
+
+ /** Task for checking if reg summary is out of sync and then sending a heartbeat to the server. */
+ private RegSyncHeartbeatTask regSyncHeartbeatTask;
+
+ /** Task for writing the state blob to persistent storage. */
+ private PersistentWriteTask persistentWriteTask;
+
+ /** A task for periodic heartbeats. */
+ private HeartbeatTask heartbeatTask;
+
+ /** Task to send all batched messages to the server. */
+ private BatchingTask batchingTask;
+
+ /** Task to do the first heartbeat after a persistent restart. */
+ private InitialPersistentHeartbeatTask initialPersistentHeartbeatTask;
+
+ /** A cache of already acked invalidations to avoid duplicate delivery. */
+ private final AckCache ackCache = new AckCache();
+
+ /**
+ * Constructs a client.
+ *
+ * @param resources resources to use during execution
+ * @param random a random number generator
+ * @param clientType client type code
+ * @param clientName application identifier for the client
+ * @param config configuration for the client
+ * @param applicationName name of the application using the library (for debugging/monitoring)
+ * @param regManagerState marshalled registration manager state, if any
+ * @param protocolHandlerState marshalled protocol handler state, if any
+ * @param listener application callback
+ */
+ private InvalidationClientCore(final SystemResources resources, Random random, int clientType,
+ final byte[] clientName, ClientConfigP config, String applicationName,
+ RunStateP ticlRunState,
+ RegistrationManagerStateP regManagerState,
+ ProtocolHandlerState protocolHandlerState,
+ StatisticsState statisticsState,
+ InvalidationListener listener) {
+ this.resources = Preconditions.checkNotNull(resources);
+ this.random = random;
+ this.logger = Preconditions.checkNotNull(resources.getLogger());
+ this.internalScheduler = resources.getInternalScheduler();
+ this.storage = resources.getStorage();
+ this.config = config;
+ this.ticlState = (ticlRunState == null) ? new RunState() : new RunState(ticlRunState);
+ this.smearer = new Smearer(random, this.config.getSmearPercent());
+ this.applicationClientId = ApplicationClientIdP.create(clientType, new Bytes(clientName));
+ this.listener = listener;
+ this.statistics = (statisticsState != null)
+ ? Statistics.deserializeStatistics(resources.getLogger(), statisticsState.getCounter())
+ : new Statistics();
+ this.registrationManager = new RegistrationManager(logger, statistics, digestFn,
+ regManagerState);
+ this.protocolHandler = new ProtocolHandler(config.getProtocolHandlerConfig(), resources,
+ smearer, statistics, clientType, applicationName, this, protocolHandlerState);
+ }
+
+ /**
+ * Constructs a client with default state.
+ *
+ * @param resources resources to use during execution
+ * @param random a random number generator
+ * @param clientType client type code
+ * @param clientName application identifier for the client
+ * @param config configuration for the client
+ * @param applicationName name of the application using the library (for debugging/monitoring)
+ * @param listener application callback
+ */
+ public InvalidationClientCore(final SystemResources resources, Random random, int clientType,
+ final byte[] clientName, ClientConfigP config, String applicationName,
+ InvalidationListener listener) {
+ this(resources, random, clientType, clientName, config, applicationName, null, null, null, null,
+ listener);
+ createSchedulingTasks(null);
+ registerWithNetwork(resources);
+ logger.info("Created client: %s", this);
+ }
+
+ /**
+ * Constructs a client with state initialized from {@code marshalledState}.
+ *
+ * @param resources resources to use during execution
+ * @param random a random number generator
+ * @param clientType client type code
+ * @param clientName application identifier for the client
+ * @param config configuration for the client
+ * @param applicationName name of the application using the library (for debugging/monitoring)
+ * @param listener application callback
+ */
+ public InvalidationClientCore(final SystemResources resources, Random random, int clientType,
+ final byte[] clientName, ClientConfigP config, String applicationName,
+ InvalidationClientState marshalledState, InvalidationListener listener) {
+ this(resources, random, clientType, clientName, config, applicationName,
+ marshalledState.getRunState(), marshalledState.getRegistrationManagerState(),
+ marshalledState.getProtocolHandlerState(), marshalledState.getStatisticsState(), listener);
+ // Unmarshall.
+ if (marshalledState.hasClientToken()) {
+ clientToken = marshalledState.getClientToken();
+ }
+ if (marshalledState.hasNonce()) {
+ nonce = marshalledState.getNonce();
+ }
+ this.shouldSendRegistrations = marshalledState.getShouldSendRegistrations();
+ this.lastMessageSendTimeMs = marshalledState.getLastMessageSendTimeMs();
+ this.isOnline = marshalledState.getIsOnline();
+ createSchedulingTasks(marshalledState);
+
+ // We register with the network after unmarshalling our isOnline value. This is because when
+ // we register with the network, it may give us a new value for isOnline. If we unmarshalled
+ // after registering, then we would clobber the new value with the old marshalled value, which
+ // is wrong.
+ registerWithNetwork(resources);
+ logger.info("Created client: %s", this);
+ }
+
+ /**
+ * Registers handlers for received messages and network status changes with the network of
+ * {@code resources}.
+ */
+ private void registerWithNetwork(final SystemResources resources) {
+ resources.getNetwork().setListener(new NetworkChannel.NetworkListener() {
+ @Override
+ public void onMessageReceived(byte[] incomingMessage) {
+ InvalidationClientCore.this.handleIncomingMessage(incomingMessage);
+ }
+ @Override
+ public void onOnlineStatusChange(boolean isOnline) {
+ InvalidationClientCore.this.handleNetworkStatusChange(isOnline);
+ }
+ @Override
+ public void onAddressChange() {
+ // Send a message to the server. The header will include the new network address.
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ sendInfoMessageToServer(false, false);
+ }
+ });
+ }
+
+ /** Returns a default config builder for the client. */
+ public static ClientConfigP createConfig() {
+ Version version =
+ Version.create(ClientConstants.CONFIG_MAJOR_VERSION, ClientConstants.CONFIG_MINOR_VERSION);
+ ProtocolHandlerConfigP protocolHandlerConfig = ProtocolHandler.createConfig();
+ ClientConfigP.Builder builder = new ClientConfigP.Builder(version, protocolHandlerConfig);
+ return builder.build();
+ }
+
+ /** Returns a configuration builder with parameters set for unit tests. */
+ public static ClientConfigP createConfigForTest() {
+ Version version =
+ Version.create(ClientConstants.CONFIG_MAJOR_VERSION, ClientConstants.CONFIG_MINOR_VERSION);
+ ProtocolHandlerConfigP protocolHandlerConfig = ProtocolHandler.createConfigForTest();
+ ClientConfigP.Builder builder = new ClientConfigP.Builder(version, protocolHandlerConfig);
+ builder.networkTimeoutDelayMs = 2 * 1000;
+ builder.heartbeatIntervalMs = 5 * 1000;
+ builder.writeRetryDelayMs = 500;
+ return builder.build();
+ }
+
+ /**
+ * Creates the tasks used by the Ticl for token acquisition, heartbeats, persistent writes and
+ * registration sync.
+ *
+ * @param marshalledState saved state of recurring tasks
+ */
+ private void createSchedulingTasks(InvalidationClientState marshalledState) {
+ if (marshalledState == null) {
+ this.acquireTokenTask = new AcquireTokenTask();
+ this.heartbeatTask = new HeartbeatTask();
+ this.regSyncHeartbeatTask = new RegSyncHeartbeatTask();
+ this.persistentWriteTask = new PersistentWriteTask();
+ this.batchingTask = new BatchingTask(protocolHandler, resources, smearer,
+ config.getProtocolHandlerConfig().getBatchingDelayMs());
+ } else {
+ this.acquireTokenTask = new AcquireTokenTask(marshalledState.getAcquireTokenTaskState());
+ this.heartbeatTask = new HeartbeatTask(marshalledState.getHeartbeatTaskState());
+ this.regSyncHeartbeatTask =
+ new RegSyncHeartbeatTask(marshalledState.getRegSyncHeartbeatTaskState());
+ this.persistentWriteTask =
+ new PersistentWriteTask(marshalledState.getPersistentWriteTaskState());
+ this.batchingTask = new BatchingTask(protocolHandler, resources, smearer,
+ marshalledState.getBatchingTaskState());
+ if (marshalledState.hasLastWrittenState()) {
+ persistentWriteTask.lastWrittenState.set(marshalledState.getLastWrittenState());
+ }
+ }
+ // The handling of new InitialPersistentHeartbeatTask is a little strange. We create one when
+ // the Ticl is first created so that it can be called by the scheduler if it had been scheduled
+ // in the past. Otherwise, when we are ready to schedule one ourselves, we create a new instance
+ // with the proper delay, then schedule it. We have to do this because we don't know what delay
+ // to use here, since we don't compute it until start().
+ this.initialPersistentHeartbeatTask = new InitialPersistentHeartbeatTask(0);
+ }
+
+ /** Returns the configuration used by the client. */
+ protected ClientConfigP getConfig() {
+ return config;
+ }
+
+ // Methods for TestableInvalidationClient.
+
+ @Override
+
+ public ClientConfigP getConfigForTest() {
+ return getConfig();
+ }
+
+ @Override
+
+ public byte[] getApplicationClientIdForTest() {
+ return applicationClientId.toByteArray();
+ }
+
+ /** Returns the application client id of this client. */
+ protected ApplicationClientIdP getApplicationClientIdP() {
+ return applicationClientId;
+ }
+
+ @Override
+
+ public InvalidationListener getInvalidationListenerForTest() {
+ return (listener instanceof CheckingInvalidationListener) ?
+ ((CheckingInvalidationListener) this.listener).getDelegate() : this.listener;
+ }
+
+
+ public SystemResources getResourcesForTest() {
+ return resources;
+ }
+
+ public SystemResources getResources() {
+ return resources;
+ }
+
+ @Override
+
+ public Statistics getStatisticsForTest() {
+ return statistics;
+ }
+
+ Statistics getStatistics() {
+ return statistics;
+ }
+
+ @Override
+
+ public DigestFunction getDigestFunctionForTest() {
+ return this.digestFn;
+ }
+
+ @Override
+
+ public long getNextMessageSendTimeMsForTest() {
+ Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
+ return protocolHandler.getNextMessageSendTimeMsForTest();
+ }
+
+ @Override
+
+ public RegistrationManagerState getRegistrationManagerStateCopyForTest() {
+ Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
+ return registrationManager.getRegistrationManagerStateCopyForTest();
+ }
+
+ @Override
+
+ public void changeNetworkTimeoutDelayForTest(int networkTimeoutDelayMs) {
+ ClientConfigP.Builder builder = config.toBuilder();
+ builder.networkTimeoutDelayMs = networkTimeoutDelayMs;
+ config = builder.build();
+ createSchedulingTasks(null);
+ }
+
+ @Override
+
+ public void changeHeartbeatDelayForTest(int heartbeatDelayMs) {
+ ClientConfigP.Builder builder = config.toBuilder();
+ builder.heartbeatIntervalMs = heartbeatDelayMs;
+ config = builder.build();
+ createSchedulingTasks(null);
+ }
+
+ @Override
+
+ public void setDigestStoreForTest(DigestStore<ObjectIdP> digestStore) {
+ Preconditions.checkState(!resources.isStarted());
+ registrationManager.setDigestStoreForTest(digestStore);
+ }
+
+ @Override
+
+ public Bytes getClientTokenForTest() {
+ return getClientToken();
+ }
+
+ @Override
+
+ public String getClientTokenKeyForTest() {
+ return CLIENT_TOKEN_KEY;
+ }
+
+ @Override
+ public boolean isStartedForTest() {
+ return isStarted();
+ }
+
+ /**
+ * Returns whether the Ticl is started, i.e., whether it at some point had a session with the
+ * data center after being constructed.
+ */
+ protected boolean isStarted() {
+ return ticlState.isStarted();
+ }
+
+ @Override
+ public void stopResources() {
+ resources.stop();
+ }
+
+ @Override
+ public long getResourcesTimeMs() {
+ return resources.getInternalScheduler().getCurrentTimeMs();
+ }
+
+ @Override
+ public Scheduler getInternalSchedulerForTest() {
+ return resources.getInternalScheduler();
+ }
+
+ @Override
+ public Storage getStorage() {
+ return storage;
+ }
+
+ @Override
+ public NetworkEndpointId getNetworkIdForTest() {
+ NetworkChannel network = resources.getNetwork();
+ if (!(network instanceof TestableNetworkChannel)) {
+ throw new UnsupportedOperationException(
+ "getNetworkIdForTest requires a TestableNetworkChannel, not: " + network.getClass());
+ }
+ return ((TestableNetworkChannel) network).getNetworkIdForTest();
+ }
+
+ // End of methods for TestableInvalidationClient
+
+ @Override // InvalidationClient
+ public void start() {
+ Preconditions.checkState(resources.isStarted(), "Resources must be started before starting " +
+ "the Ticl");
+ if (ticlState.isStarted()) {
+ logger.severe("Ignoring start call since already started: client = %s", this);
+ return;
+ }
+
+ // Initialize the nonce so that we can maintain the invariant that exactly one of
+ // "nonce" and "clientToken" is non-null.
+ setNonce(generateNonce(random));
+
+ logger.info("Starting with Java config: %s", config);
+ // Read the state blob and then schedule startInternal once the value is there.
+ scheduleStartAfterReadingStateBlob();
+ }
+
+ /**
+ * Implementation of {@link #start} on the internal thread with the persistent
+ * {@code serializedState} if any. Starts the TICL protocol and makes the TICL ready to receive
+ * registrations, invalidations, etc.
+ */
+ private void startInternal(byte[] serializedState) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+
+ // Initialize the session manager using the persisted client token.
+ PersistentTiclState persistentState =
+ (serializedState == null) ? null : PersistenceUtils.deserializeState(logger,
+ serializedState, digestFn);
+
+ if ((serializedState != null) && (persistentState == null)) {
+ // In this case, we'll proceed as if we had no persistent state -- i.e., obtain a new client
+ // id from the server.
+ statistics.recordError(ClientErrorType.PERSISTENT_DESERIALIZATION_FAILURE);
+ logger.severe("Failed deserializing persistent state: %s",
+ Bytes.toLazyCompactString(serializedState));
+ }
+ if (persistentState != null) {
+ // If we have persistent state, use the previously-stored token and send a heartbeat to
+ // let the server know that we've restarted, since we may have been marked offline.
+
+ // In the common case, the server will already have all of our
+ // registrations, but we won't know for sure until we've gotten its summary.
+ // We'll ask the application for all of its registrations, but to avoid
+ // making the registrar redo the work of performing registrations that
+ // probably already exist, we'll suppress sending them to the registrar.
+ logger.info("Restarting from persistent state: %s", persistentState.getClientToken());
+ setNonce(null);
+ setClientToken(persistentState.getClientToken());
+ shouldSendRegistrations = false;
+
+ // Schedule an info message for the near future.
+ int initialHeartbeatDelayMs = computeInitialPersistentHeartbeatDelayMs(
+ config, resources, persistentState.getLastMessageSendTimeMs());
+ initialPersistentHeartbeatTask = new InitialPersistentHeartbeatTask(initialHeartbeatDelayMs);
+ initialPersistentHeartbeatTask.ensureScheduled("");
+
+ // We need to ensure that heartbeats are sent, regardless of whether we start fresh or from
+ // persistent state. The line below ensures that they are scheduled in the persistent startup
+ // case. For the other case, the task is scheduled when we acquire a token.
+ heartbeatTask.ensureScheduled("Startup-after-persistence");
+ } else {
+ // If we had no persistent state or couldn't deserialize the state that we had, start fresh.
+ // Request a new client identifier.
+
+ // The server can't possibly have our registrations, so whatever we get
+ // from the application we should send to the registrar.
+ logger.info("Starting with no previous state");
+ shouldSendRegistrations = true;
+ acquireToken("Startup");
+ }
+
+ // listener.ready() is called when ticl has acquired a new token.
+ }
+
+ /**
+ * Returns the delay for the initial heartbeat, given that the last message to the server was
+ * sent at {@code lastSendTimeMs}.
+ * @param config configuration object used by the client
+ * @param resources resources used by the client
+ */
+
+ static int computeInitialPersistentHeartbeatDelayMs(ClientConfigP config,
+ SystemResources resources, long lastSendTimeMs) {
+ // There are five cases:
+ // 1. Channel does not support offline delivery. We delay a little bit to allow the
+ // application to reissue its registrations locally and avoid triggering registration
+ // sync with the data center due to a hash mismatch. This is the "minimum delay," and we
+ // never use a delay less than it.
+ //
+ // All other cases are for channels supporting offline delivery.
+ //
+ // 2. Last send time is in the future (something weird happened). Use the minimum delay.
+ // 3. We have been asleep for more than one heartbeat interval. Use the minimum delay.
+ // 4. We have been asleep for less than one heartbeat interval.
+ // (a). The time remaining to the end of the interval is less than the minimum delay.
+ // Use the minimum delay.
+ // (b). The time remaining to the end of the interval is more than the minimum delay.
+ // Use the remaining delay.
+ final long nowMs = resources.getInternalScheduler().getCurrentTimeMs();
+ final int initialHeartbeatDelayMs;
+ if (!config.getChannelSupportsOfflineDelivery()) {
+ // Case 1.
+ initialHeartbeatDelayMs = config.getInitialPersistentHeartbeatDelayMs();
+ } else {
+ // Offline delivery cases (2, 3, 4).
+ // The default of the last send time is zero, so even if it wasn't written in the persistent
+ // state, this logic is still correct.
+ if ((lastSendTimeMs > nowMs) || // Case 2.
+ ((lastSendTimeMs + config.getHeartbeatIntervalMs()) < nowMs)) { // Case 3.
+ // Either something strange happened and the last send time is in the future, or we
+ // have been asleep for more than one heartbeat interval. Send immediately.
+ initialHeartbeatDelayMs = config.getInitialPersistentHeartbeatDelayMs();
+ } else {
+ // Case 4.
+ // We have been asleep for less than one heartbeat interval. Send after it expires,
+ // but ensure we let the initial heartbeat interval elapse.
+ final long timeSinceLastMessageMs = nowMs - lastSendTimeMs;
+ final int remainingHeartbeatIntervalMs =
+ (int) (config.getHeartbeatIntervalMs() - timeSinceLastMessageMs);
+ initialHeartbeatDelayMs = Math.max(remainingHeartbeatIntervalMs,
+ config.getInitialPersistentHeartbeatDelayMs());
+ }
+ }
+ resources.getLogger().info("Computed heartbeat delay %s from: offline-delivery = %s, "
+ + "initial-persistent-delay = %s, heartbeat-interval = %s, nowMs = %s",
+ initialHeartbeatDelayMs, config.getChannelSupportsOfflineDelivery(),
+ config.getInitialPersistentHeartbeatDelayMs(), config.getHeartbeatIntervalMs(),
+ nowMs);
+ return initialHeartbeatDelayMs;
+ }
+
+ @Override // InvalidationClient
+ public void stop() {
+ logger.warning("Ticl being stopped: %s", InvalidationClientCore.this);
+ if (ticlState.isStarted()) { // RunState is thread-safe.
+ ticlState.stop();
+ }
+ }
+
+ @Override // InvalidationClient
+ public void register(ObjectId objectId) {
+ List<ObjectId> objectIds = new ArrayList<ObjectId>();
+ objectIds.add(objectId);
+ performRegisterOperations(objectIds, RegistrationP.OpType.REGISTER);
+ }
+
+ @Override // InvalidationClient
+ public void unregister(ObjectId objectId) {
+ List<ObjectId> objectIds = new ArrayList<ObjectId>();
+ objectIds.add(objectId);
+ performRegisterOperations(objectIds, RegistrationP.OpType.UNREGISTER);
+ }
+
+ @Override // InvalidationClient
+ public void register(Collection<ObjectId> objectIds) {
+ performRegisterOperations(objectIds, RegistrationP.OpType.REGISTER);
+ }
+
+ @Override // InvalidationClient
+ public void unregister(Collection<ObjectId> objectIds) {
+ performRegisterOperations(objectIds, RegistrationP.OpType.UNREGISTER);
+ }
+
+ /**
+ * Implementation of (un)registration.
+ *
+ * @param objectIds object ids on which to operate
+ * @param regOpType whether to register or unregister
+ */
+ private void performRegisterOperations(final Collection<ObjectId> objectIds,
+ final int regOpType) {
+ Preconditions.checkState(!objectIds.isEmpty(), "Must specify some object id");
+ Preconditions.checkState(internalScheduler.isRunningOnThread(),
+ "Not running on internal thread");
+
+ if (ticlState.isStopped()) {
+ // The Ticl has been stopped. This might be some old registration op coming in. Just ignore
+ // instead of crashing.
+ logger.severe("Ticl stopped: register (%s) of %s ignored.", regOpType, objectIds);
+ return;
+ }
+ if (!ticlState.isStarted()) {
+ // We must be in the NOT_STARTED state, since we can't be in STOPPED or STARTED (since the
+ // previous if-check didn't succeeded, and isStarted uses a != STARTED test).
+ logger.severe(
+ "Ticl is not yet started; failing registration call; client = %s, objects = %s, op = %s",
+ this, objectIds, regOpType);
+ for (ObjectId objectId : objectIds) {
+ listener.informRegistrationFailure(this, objectId, true, "Client not yet ready");
+ }
+ return;
+ }
+
+ List<ObjectIdP> objectIdProtos = new ArrayList<ObjectIdP>(objectIds.size());
+ for (ObjectId objectId : objectIds) {
+ Preconditions.checkNotNull(objectId, "Must specify object id");
+ ObjectIdP objectIdProto = ProtoWrapperConverter.convertToObjectIdProto(objectId);
+ IncomingOperationType opType = (regOpType == RegistrationP.OpType.REGISTER) ?
+ IncomingOperationType.REGISTRATION : IncomingOperationType.UNREGISTRATION;
+ statistics.recordIncomingOperation(opType);
+ logger.info("Register %s, %s", objectIdProto, regOpType);
+ objectIdProtos.add(objectIdProto);
+ }
+
+ // Update the registration manager state, then have the protocol client send a message.
+ // performOperations returns only those elements of objectIdProtos that caused a state
+ // change (i.e., elements not present if regOpType == REGISTER or elements that were present
+ // if regOpType == UNREGISTER).
+ Collection<ObjectIdP> objectProtosToSend = registrationManager.performOperations(
+ objectIdProtos, regOpType);
+
+ // Check whether we should suppress sending registrations because we don't
+ // yet know the server's summary.
+ if (shouldSendRegistrations && (!objectProtosToSend.isEmpty())) {
+ protocolHandler.sendRegistrations(objectProtosToSend, regOpType, batchingTask);
+ }
+ InvalidationClientCore.this.regSyncHeartbeatTask.ensureScheduled("performRegister");
+ }
+
+ @Override // InvalidationClient
+ public void acknowledge(final AckHandle acknowledgeHandle) {
+ Preconditions.checkNotNull(acknowledgeHandle);
+ Preconditions.checkState(internalScheduler.isRunningOnThread(),
+ "Not running on internal thread");
+
+ // Parse and validate the ack handle first.
+ AckHandleP ackHandle;
+ try {
+ ackHandle = AckHandleP.parseFrom(acknowledgeHandle.getHandleData());
+ } catch (ValidationException exception) {
+ logger.warning("Bad ack handle : %s",
+ Bytes.toLazyCompactString(acknowledgeHandle.getHandleData()));
+ statistics.recordError(ClientErrorType.ACKNOWLEDGE_HANDLE_FAILURE);
+ return;
+ }
+
+ // Currently, only invalidations have non-trivial ack handle.
+ InvalidationP invalidation = ackHandle.getNullableInvalidation();
+ if (invalidation == null) {
+ logger.warning("Ack handle without invalidation : %s",
+ Bytes.toLazyCompactString(acknowledgeHandle.getHandleData()));
+ statistics.recordError(ClientErrorType.ACKNOWLEDGE_HANDLE_FAILURE);
+ return;
+ }
+
+ // Don't send the payload back.
+ if (invalidation.hasPayload()) {
+ InvalidationP.Builder builder = invalidation.toBuilder();
+ builder.payload = null;
+ invalidation = builder.build();
+ }
+ statistics.recordIncomingOperation(IncomingOperationType.ACKNOWLEDGE);
+ protocolHandler.sendInvalidationAck(invalidation, batchingTask);
+
+ // Record that the invalidation has been acknowledged to potentially avoid unnecessary delivery
+ // of earlier invalidations for the same object.
+ ackCache.recordAck(invalidation);
+ }
+
+ //
+ // Protocol listener methods
+ //
+
+ @Override
+ public Bytes getClientToken() {
+ Preconditions.checkState((clientToken == null) || (nonce == null));
+ return clientToken;
+ }
+
+ @Override
+ public void handleMessageSent() {
+ // The ProtocolHandler just sent a message to the server. If the channel supports offline
+ // delivery (see the comment in the ClientConfigP), store this time to stable storage. This
+ // only needs to be a best-effort write; if it fails, then we will "forget" that we sent the
+ // message and heartbeat needlessly when next restarted. That is a performance/battery bug,
+ // not a correctness bug.
+ lastMessageSendTimeMs = getResourcesTimeMs();
+ if (config.getChannelSupportsOfflineDelivery()) {
+ // Write whether or not we have a token. The persistent write task is a no-op if there is
+ // no token. We only write if the channel supports offline delivery. We could do the write
+ // regardless, and may want to do so in the future, since it might simplify some of the
+ // Ticl implementation.
+ persistentWriteTask.ensureScheduled("sent-message");
+ }
+ }
+
+ @Override
+ public RegistrationSummary getRegistrationSummary() {
+ return registrationManager.getRegistrationSummary();
+ }
+
+ //
+ // Private methods and toString.
+ //
+
+ void handleNetworkStatusChange(final boolean isOnline) {
+ // If we're back online and haven't sent a message to the server in a while, send a heartbeat to
+ // make sure the server knows we're online.
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ boolean wasOnline = this.isOnline;
+ this.isOnline = isOnline;
+ if (isOnline && !wasOnline && (internalScheduler.getCurrentTimeMs() >
+ lastMessageSendTimeMs + config.getOfflineHeartbeatThresholdMs())) {
+ logger.log(Level.INFO,
+ "Sending heartbeat after reconnection, previous send was %s ms ago",
+ internalScheduler.getCurrentTimeMs() - lastMessageSendTimeMs);
+ sendInfoMessageToServer(false, !registrationManager.isStateInSyncWithServer());
+ }
+ }
+
+ /**
+ * Handles an {@code incomingMessage} from the data center. If it is valid and addressed to
+ * this client, dispatches to methods to handle sub-parts of the message; if not, drops the
+ * message.
+ */
+ void handleIncomingMessage(byte[] incomingMessage) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ statistics.recordReceivedMessage(ReceivedMessageType.TOTAL);
+ ParsedMessage parsedMessage = protocolHandler.handleIncomingMessage(incomingMessage);
+ if (parsedMessage == null) {
+ // Invalid message.
+ return;
+ }
+
+ // Ensure we have either a matching token or a matching nonce.
+ if (!validateToken(parsedMessage)) {
+ return;
+ }
+
+ // Handle a token-control message, if present.
+ if (parsedMessage.tokenControlMessage != null) {
+ statistics.recordReceivedMessage(ReceivedMessageType.TOKEN_CONTROL);
+ handleTokenChanged(parsedMessage.header.token,
+ parsedMessage.tokenControlMessage.hasNewToken() ?
+ parsedMessage.tokenControlMessage.getNewToken() : null);
+ }
+
+ // We might have lost our token or failed to acquire one. Ensure that we do not proceed in
+ // either case.
+ if (clientToken == null) {
+ return;
+ }
+
+ // First, handle the message header.
+ handleIncomingHeader(parsedMessage.header);
+
+ // Then, handle any work remaining in the message.
+ if (parsedMessage.invalidationMessage != null) {
+ statistics.recordReceivedMessage(ReceivedMessageType.INVALIDATION);
+ handleInvalidations(parsedMessage.invalidationMessage.getInvalidation());
+ }
+ if (parsedMessage.registrationStatusMessage != null) {
+ statistics.recordReceivedMessage(ReceivedMessageType.REGISTRATION_STATUS);
+ handleRegistrationStatus(parsedMessage.registrationStatusMessage.getRegistrationStatus());
+ }
+ if (parsedMessage.registrationSyncRequestMessage != null) {
+ statistics.recordReceivedMessage(ReceivedMessageType.REGISTRATION_SYNC_REQUEST);
+ handleRegistrationSyncRequest();
+ }
+ if (parsedMessage.infoRequestMessage != null) {
+ statistics.recordReceivedMessage(ReceivedMessageType.INFO_REQUEST);
+ handleInfoMessage(parsedMessage.infoRequestMessage.getInfoType());
+ }
+ if (parsedMessage.errorMessage != null) {
+ statistics.recordReceivedMessage(ReceivedMessageType.ERROR);
+ handleErrorMessage(parsedMessage.header, parsedMessage.errorMessage.getCode(),
+ parsedMessage.errorMessage.getDescription());
+ }
+ }
+
+ /**
+ * Handles a token-control message.
+ * @param headerToken token in the server message
+ * @param newToken the new token provided, or {@code null} if this is a destroy message.
+ */
+ private void handleTokenChanged(Bytes headerToken, final Bytes newToken) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+
+ // The server is either supplying a new token in response to an InitializeMessage, spontaneously
+ // destroying a token we hold, or spontaneously upgrading a token we hold.
+
+ if (newToken != null) {
+ // Note: headerToken cannot be null, so a null nonce or clientToken will always be non-equal.
+ boolean headerTokenMatchesNonce = TypedUtil.<Bytes>equals(headerToken, nonce);
+ boolean headerTokenMatchesExistingToken = TypedUtil.<Bytes>equals(headerToken, clientToken);
+ boolean shouldAcceptToken = headerTokenMatchesNonce || headerTokenMatchesExistingToken;
+ if (!shouldAcceptToken) {
+ logger.info("Ignoring new token; %s does not match nonce = %s or existing token = %s",
+ newToken, nonce, clientToken);
+ return;
+ }
+ logger.info("New token being assigned at client: %s, Old = %s", newToken, clientToken);
+
+ // Start the regular heartbeats now.
+ heartbeatTask.ensureScheduled("Heartbeat-after-new-token");
+ setNonce(null);
+ setClientToken(newToken);
+ persistentWriteTask.ensureScheduled("Write-after-new-token");
+ } else {
+ logger.info("Destroying existing token: %s", clientToken);
+ acquireToken("Destroy");
+ }
+ }
+
+ /** Handles a server {@code header}. */
+ private void handleIncomingHeader(ServerMessageHeader header) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ if (nonce != null) {
+ throw new IllegalStateException(
+ "Cannot process server header with non-null nonce (have " + nonce + "): " + header);
+ }
+ if (header.registrationSummary != null) {
+ // We've received a summary from the server, so if we were suppressing registrations, we
+ // should now allow them to go to the registrar.
+ shouldSendRegistrations = true;
+
+ // Pass the registration summary to the registration manager. If we are now in agreement
+ // with the server and we had any pending operations, we can tell the listener that those
+ // operations have succeeded.
+ Set<RegistrationP> upcalls =
+ registrationManager.informServerRegistrationSummary(header.registrationSummary);
+ logger.fine("Received new server registration summary (%s); will make %s upcalls",
+ header.registrationSummary, upcalls.size());
+ for (RegistrationP registration : upcalls) {
+ ObjectId objectId =
+ ProtoWrapperConverter.convertFromObjectIdProto(registration.getObjectId());
+ RegistrationState regState = convertOpTypeToRegState(registration.getOpType());
+ listener.informRegistrationStatus(this, objectId, regState);
+ }
+ }
+ }
+
+ /** Handles incoming {@code invalidations}. */
+ private void handleInvalidations(Collection<InvalidationP> invalidations) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+
+ for (InvalidationP invalidation : invalidations) {
+ AckHandle ackHandle = AckHandle.newInstance(AckHandleP.create(invalidation).toByteArray());
+ if (ackCache.isAcked(invalidation)) {
+ // If the ack cache indicates that the client has already acked a restarted invalidation
+ // with an equal or greater version, then the TICL can simply acknowledge it immediately
+ // rather than delivering it to the listener.
+ logger.info("Stale invalidation {0}, not delivering", invalidation);
+ acknowledge(ackHandle);
+ statistics.recordReceivedMessage(ReceivedMessageType.STALE_INVALIDATION);
+ } else if (CommonProtos.isAllObjectId(invalidation.getObjectId())) {
+ logger.info("Issuing invalidate all");
+ listener.invalidateAll(InvalidationClientCore.this, ackHandle);
+ } else {
+ // Regular object. Could be unknown version or not.
+ Invalidation inv = ProtoWrapperConverter.convertFromInvalidationProto(invalidation);
+
+ boolean isSuppressed = invalidation.getIsTrickleRestart();
+ logger.info("Issuing invalidate (known-version = %s, is-trickle-restart = %s): %s",
+ invalidation.getIsKnownVersion(), isSuppressed, inv);
+
+ // Issue invalidate if the invalidation had a known version AND either no suppression has
+ // occurred or the client allows suppression.
+ if (invalidation.getIsKnownVersion() &&
+ (!isSuppressed || InvalidationClientCore.this.config.getAllowSuppression())) {
+ listener.invalidate(InvalidationClientCore.this, inv, ackHandle);
+ } else {
+ // Otherwise issue invalidateUnknownVersion.
+ listener.invalidateUnknownVersion(InvalidationClientCore.this, inv.getObjectId(),
+ ackHandle);
+ }
+ }
+ }
+ }
+
+ /** Handles incoming registration statuses. */
+ private void handleRegistrationStatus(List<RegistrationStatus> regStatusList) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ List<Boolean> localProcessingStatuses =
+ registrationManager.handleRegistrationStatus(regStatusList);
+ Preconditions.checkState(localProcessingStatuses.size() == regStatusList.size(),
+ "Not all registration statuses were processed");
+
+ // Inform app about the success or failure of each registration based
+ // on what the registration manager has indicated.
+ for (int i = 0; i < regStatusList.size(); ++i) {
+ RegistrationStatus regStatus = regStatusList.get(i);
+ boolean wasSuccess = localProcessingStatuses.get(i);
+ logger.fine("Process reg status: %s", regStatus);
+
+ ObjectId objectId = ProtoWrapperConverter.convertFromObjectIdProto(
+ regStatus.getRegistration().getObjectId());
+ if (wasSuccess) {
+ // Server operation was both successful and agreed with what the client wanted.
+ int regOpType = regStatus.getRegistration().getOpType();
+ InvalidationListener.RegistrationState regState = convertOpTypeToRegState(regOpType);
+ listener.informRegistrationStatus(InvalidationClientCore.this, objectId, regState);
+ } else {
+ // Server operation either failed or disagreed with client's intent (e.g., successful
+ // unregister, but the client wanted a registration).
+ String description = CommonProtos.isSuccess(regStatus.getStatus())
+ ? "Registration discrepancy detected" : regStatus.getStatus().getDescription();
+ boolean isPermanent = CommonProtos.isPermanentFailure(regStatus.getStatus());
+ listener.informRegistrationFailure(InvalidationClientCore.this, objectId, !isPermanent,
+ description);
+ }
+ }
+ }
+
+ /** Handles a registration sync request. */
+ private void handleRegistrationSyncRequest() {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ // Send all the registrations in the reg sync message.
+ // Generate a single subtree for all the registrations.
+ RegistrationSubtree subtree =
+ registrationManager.getRegistrations(Bytes.EMPTY_BYTES.getByteArray(), 0);
+ protocolHandler.sendRegistrationSyncSubtree(subtree, batchingTask);
+ }
+
+ /** Handles an info message request. */
+ private void handleInfoMessage(Collection<Integer> infoTypes) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ boolean mustSendPerformanceCounters = false;
+ for (int infoType : infoTypes) {
+ mustSendPerformanceCounters = (infoType == InfoType.GET_PERFORMANCE_COUNTERS);
+ if (mustSendPerformanceCounters) {
+ break;
+ }
+ }
+ sendInfoMessageToServer(mustSendPerformanceCounters,
+ !registrationManager.isStateInSyncWithServer());
+ }
+
+ /** Handles an error message. */
+ private void handleErrorMessage(ServerMessageHeader header, int code, String description) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+
+ // If it is an auth failure, we shut down the ticl.
+ logger.severe("Received error message: %s, %s, %s", header, code, description);
+
+ // Translate the code to error reason.
+ int reason;
+ switch (code) {
+ case ErrorMessage.Code.AUTH_FAILURE:
+ reason = ErrorInfo.ErrorReason.AUTH_FAILURE;
+ break;
+ case ErrorMessage.Code.UNKNOWN_FAILURE:
+ default:
+ reason = ErrorInfo.ErrorReason.UNKNOWN_FAILURE;
+ break;
+ }
+
+ // Issue an informError to the application.
+ ErrorInfo errorInfo = ErrorInfo.newInstance(reason, false, description, null);
+ listener.informError(this, errorInfo);
+
+ // If this is an auth failure, remove registrations and stop the Ticl. Otherwise do nothing.
+ if (code != ErrorMessage.Code.AUTH_FAILURE) {
+ return;
+ }
+
+ // If there are any registrations, remove them and issue registration failure.
+ Collection<ObjectIdP> desiredRegistrations = registrationManager.removeRegisteredObjects();
+ logger.warning("Issuing failure for %s objects", desiredRegistrations.size());
+ for (ObjectIdP objectId : desiredRegistrations) {
+ listener.informRegistrationFailure(this,
+ ProtoWrapperConverter.convertFromObjectIdProto(objectId), false,
+ "Auth error: " + description);
+ }
+ }
+
+ /**
+ * Returns whether the token in the header of {@code parsedMessage} matches either the
+ * client token or nonce of this Ticl (depending on which is non-{@code null}).
+ */
+ private boolean validateToken(ParsedMessage parsedMessage) {
+ if (clientToken != null) {
+ // Client token case.
+ if (!TypedUtil.<Bytes>equals(clientToken, parsedMessage.header.token)) {
+ logger.info("Incoming message has bad token: server = %s, client = %s",
+ parsedMessage.header.token, clientToken);
+ statistics.recordError(ClientErrorType.TOKEN_MISMATCH);
+ return false;
+ }
+ return true;
+ } else if (nonce != null) {
+ // Nonce case.
+ if (!TypedUtil.<Bytes>equals(nonce, parsedMessage.header.token)) {
+ statistics.recordError(ClientErrorType.NONCE_MISMATCH);
+ logger.info("Rejecting server message with mismatched nonce: Client = %s, Server = %s",
+ nonce, parsedMessage.header.token);
+ return false;
+ } else {
+ logger.info("Accepting server message with matching nonce: %s", nonce);
+ return true;
+ }
+ }
+ // Neither token nor nonce; ignore message.
+ logger.warning("Neither token nor nonce was set in validateToken: %s, %s", clientToken, nonce);
+ return false;
+ }
+
+ /**
+ * Requests a new client identifier from the server.
+ * <p>
+ * REQUIRES: no token currently be held.
+ *
+ * @param debugString information to identify the caller
+ */
+ private void acquireToken(final String debugString) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+
+ // Clear the current token and schedule the token acquisition.
+ setClientToken(null);
+ acquireTokenTask.ensureScheduled(debugString);
+ }
+
+ /**
+ * Sends an info message to the server. If {@code mustSendPerformanceCounters} is true,
+ * the performance counters are sent regardless of when they were sent earlier.
+ */
+ private void sendInfoMessageToServer(boolean mustSendPerformanceCounters,
+ boolean requestServerSummary) {
+ logger.info("Sending info message to server; request server summary = %s",
+ requestServerSummary);
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+
+ List<SimplePair<String, Integer>> performanceCounters =
+ new ArrayList<SimplePair<String, Integer>>();
+ ClientConfigP configToSend = null;
+ if (mustSendPerformanceCounters) {
+ statistics.getNonZeroStatistics(performanceCounters);
+ configToSend = config;
+ }
+ protocolHandler.sendInfoMessage(performanceCounters, configToSend, requestServerSummary,
+ batchingTask);
+ }
+
+ /** Reads the Ticl state from persistent storage (if any) and calls {@code startInternal}. */
+ private void scheduleStartAfterReadingStateBlob() {
+ storage.readKey(CLIENT_TOKEN_KEY, new Callback<SimplePair<Status, byte[]>>() {
+ @Override
+ public void accept(final SimplePair<Status, byte[]> readResult) {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
+ final byte[] serializedState = readResult.getFirst().isSuccess() ?
+ readResult.getSecond() : null;
+ // Call start now.
+ if (!readResult.getFirst().isSuccess()) {
+ statistics.recordError(ClientErrorType.PERSISTENT_READ_FAILURE);
+ logger.warning("Could not read state blob: %s", readResult.getFirst().getMessage());
+ }
+ startInternal(serializedState);
+ }
+ });
+ }
+
+ /**
+ * Converts an operation type {@code regOpType} to a
+ * {@code InvalidationListener.RegistrationState}.
+ */
+ private static InvalidationListener.RegistrationState convertOpTypeToRegState(int regOpType) {
+ InvalidationListener.RegistrationState regState =
+ regOpType == RegistrationP.OpType.REGISTER ?
+ InvalidationListener.RegistrationState.REGISTERED :
+ InvalidationListener.RegistrationState.UNREGISTERED;
+ return regState;
+ }
+
+ /**
+ * Sets the nonce to {@code newNonce}.
+ * <p>
+ * REQUIRES: {@code newNonce} be null or {@link #clientToken} be null.
+ * The goal is to ensure that a nonce is never set unless there is no
+ * client token, unless the nonce is being cleared.
+ */
+ private void setNonce(Bytes newNonce) {
+ if ((newNonce != null) && (clientToken != null)) {
+ throw new IllegalStateException("Tried to set nonce with existing token " + clientToken);
+ }
+ this.nonce = newNonce;
+ }
+
+ /**
+ * Returns a randomly generated nonce. Visible for testing only.
+ */
+
+ static Bytes generateNonce(Random random) {
+ // Generate 8 random bytes.
+ byte[] randomBytes = new byte[8];
+ random.nextBytes(randomBytes);
+ return new Bytes(randomBytes);
+ }
+
+ /**
+ * Sets the clientToken to {@code newClientToken}.
+ * <p>
+ * REQUIRES: {@code newClientToken} be null or {@link #nonce} be null.
+ * The goal is to ensure that a token is never set unless there is no
+ * nonce, unless the token is being cleared.
+ */
+ private void setClientToken(Bytes newClientToken) {
+ if ((newClientToken != null) && (nonce != null)) {
+ throw new IllegalStateException("Tried to set token with existing nonce " + nonce);
+ }
+
+ // If the ticl is in the process of being started and we are getting a new token (either from
+ // persistence or from the server, start the ticl and inform the application.
+ boolean finishStartingTicl = !ticlState.isStarted() &&
+ (clientToken == null) && (newClientToken != null);
+ this.clientToken = newClientToken;
+
+ if (finishStartingTicl) {
+ finishStartingTiclAndInformListener();
+ }
+ }
+
+ /** Start the ticl and inform the listener that it is ready. */
+ private void finishStartingTiclAndInformListener() {
+ Preconditions.checkState(!ticlState.isStarted());
+ ticlState.start();
+ listener.ready(this);
+
+ // We are not currently persisting our registration digest, so regardless of whether or not
+ // we are restarting from persistent state, we need to query the application for all of
+ // its registrations.
+ listener.reissueRegistrations(InvalidationClientCore.this, RegistrationManager.EMPTY_PREFIX, 0);
+ logger.info("Ticl started: %s", this);
+ }
+
+ /**
+ * Returns an exponential backoff generator with {@code initialDelayMs} and other state as
+ * given in {@code marshalledState}.
+ */
+ private TiclExponentialBackoffDelayGenerator createExpBackOffGenerator(int initialDelayMs,
+ ExponentialBackoffState marshalledState) {
+ if (marshalledState != null) {
+ return new TiclExponentialBackoffDelayGenerator(random, initialDelayMs,
+ config.getMaxExponentialBackoffFactor(), marshalledState);
+ } else {
+ return new TiclExponentialBackoffDelayGenerator(random, initialDelayMs,
+ config.getMaxExponentialBackoffFactor());
+ }
+ }
+
+ /** Returns a map from recurring task name to the runnable for that recurring task. */
+ protected Map<String, Runnable> getRecurringTasks() {
+ final int numPersistentTasks = 6;
+ HashMap<String, Runnable> tasks = new HashMap<String, Runnable>(numPersistentTasks);
+ tasks.put(AcquireTokenTask.TASK_NAME, acquireTokenTask.getRunnable());
+ tasks.put(RegSyncHeartbeatTask.TASK_NAME, regSyncHeartbeatTask.getRunnable());
+ tasks.put(PersistentWriteTask.TASK_NAME, persistentWriteTask.getRunnable());
+ tasks.put(HeartbeatTask.TASK_NAME, heartbeatTask.getRunnable());
+ tasks.put(BatchingTask.TASK_NAME, batchingTask.getRunnable());
+ tasks.put(InitialPersistentHeartbeatTask.TASK_NAME,
+ initialPersistentHeartbeatTask.getRunnable());
+ return tasks;
+ }
+
+ @Override
+ public void toCompactString(TextBuilder builder) {
+ builder.append("Client: ").append(applicationClientId).append(", ")
+ .append(clientToken).append(", ").append(ticlState);
+ }
+
+ @Override
+ public InvalidationClientState marshal() {
+ Preconditions.checkState(internalScheduler.isRunningOnThread(),
+ "Not running on internal thread");
+ InvalidationClientState.Builder builder = new InvalidationClientState.Builder();
+ builder.runState = ticlState.marshal();
+ builder.clientToken = clientToken;
+ builder.nonce = nonce;
+ builder.shouldSendRegistrations = shouldSendRegistrations;
+ builder.lastMessageSendTimeMs = lastMessageSendTimeMs;
+ builder.isOnline = isOnline;
+ builder.protocolHandlerState = protocolHandler.marshal();
+ builder.registrationManagerState = registrationManager.marshal();
+ builder.acquireTokenTaskState = acquireTokenTask.marshal();
+ builder.regSyncHeartbeatTaskState = regSyncHeartbeatTask.marshal();
+ builder.persistentWriteTaskState = persistentWriteTask.marshal();
+ builder.heartbeatTaskState = heartbeatTask.marshal();
+ builder.batchingTaskState = batchingTask.marshal();
+ builder.lastWrittenState = persistentWriteTask.lastWrittenState.get();
+ builder.statisticsState = statistics.marshal();
+ return builder.build();
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698