| Index: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java
|
| diff --git a/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..1eca3d4c71ce05fd93fd60fbbe7c2b7e5cc2e5d7
|
| --- /dev/null
|
| +++ b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/InvalidationClientCore.java
|
| @@ -0,0 +1,1546 @@
|
| +/*
|
| + * Copyright 2011 Google Inc.
|
| + *
|
| + * Licensed under the Apache License, Version 2.0 (the "License");
|
| + * you may not use this file except in compliance with the License.
|
| + * You may obtain a copy of the License at
|
| + *
|
| + * http://www.apache.org/licenses/LICENSE-2.0
|
| + *
|
| + * Unless required by applicable law or agreed to in writing, software
|
| + * distributed under the License is distributed on an "AS IS" BASIS,
|
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| + * See the License for the specific language governing permissions and
|
| + * limitations under the License.
|
| + */
|
| +
|
| +package com.google.ipc.invalidation.ticl;
|
| +
|
| +import static com.google.ipc.invalidation.external.client.SystemResources.Scheduler.NO_DELAY;
|
| +
|
| +import com.google.ipc.invalidation.common.DigestFunction;
|
| +import com.google.ipc.invalidation.common.ObjectIdDigestUtils;
|
| +import com.google.ipc.invalidation.external.client.InvalidationListener;
|
| +import com.google.ipc.invalidation.external.client.InvalidationListener.RegistrationState;
|
| +import com.google.ipc.invalidation.external.client.SystemResources;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.Logger;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.NetworkChannel;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.Scheduler;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.Storage;
|
| +import com.google.ipc.invalidation.external.client.types.AckHandle;
|
| +import com.google.ipc.invalidation.external.client.types.Callback;
|
| +import com.google.ipc.invalidation.external.client.types.ErrorInfo;
|
| +import com.google.ipc.invalidation.external.client.types.Invalidation;
|
| +import com.google.ipc.invalidation.external.client.types.ObjectId;
|
| +import com.google.ipc.invalidation.external.client.types.SimplePair;
|
| +import com.google.ipc.invalidation.external.client.types.Status;
|
| +import com.google.ipc.invalidation.ticl.ProtocolHandler.ParsedMessage;
|
| +import com.google.ipc.invalidation.ticl.ProtocolHandler.ProtocolListener;
|
| +import com.google.ipc.invalidation.ticl.ProtocolHandler.ServerMessageHeader;
|
| +import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
|
| +import com.google.ipc.invalidation.ticl.Statistics.IncomingOperationType;
|
| +import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
|
| +import com.google.ipc.invalidation.ticl.proto.ChannelCommon.NetworkEndpointId;
|
| +import com.google.ipc.invalidation.ticl.proto.Client.AckHandleP;
|
| +import com.google.ipc.invalidation.ticl.proto.Client.ExponentialBackoffState;
|
| +import com.google.ipc.invalidation.ticl.proto.Client.PersistentTiclState;
|
| +import com.google.ipc.invalidation.ticl.proto.Client.RunStateP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientConstants;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientIdP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage.InfoType;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConfigP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatus;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.Version;
|
| +import com.google.ipc.invalidation.ticl.proto.CommonProtos;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.InvalidationClientState;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.RecurringTaskState;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.RegistrationManagerStateP;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.StatisticsState;
|
| +import com.google.ipc.invalidation.util.Box;
|
| +import com.google.ipc.invalidation.util.Bytes;
|
| +import com.google.ipc.invalidation.util.InternalBase;
|
| +import com.google.ipc.invalidation.util.Marshallable;
|
| +import com.google.ipc.invalidation.util.Preconditions;
|
| +import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
|
| +import com.google.ipc.invalidation.util.Smearer;
|
| +import com.google.ipc.invalidation.util.TextBuilder;
|
| +import com.google.ipc.invalidation.util.TypedUtil;
|
| +
|
| +import java.util.ArrayList;
|
| +import java.util.Collection;
|
| +import java.util.HashMap;
|
| +import java.util.List;
|
| +import java.util.Map;
|
| +import java.util.Random;
|
| +import java.util.Set;
|
| +import java.util.logging.Level;
|
| +
|
| +
|
| +/**
|
| + * Core implementation of the Invalidation Client Library (Ticl). Subclasses are required
|
| + * to implement concurrency control for the Ticl.
|
| + *
|
| + */
|
| +public abstract class InvalidationClientCore extends InternalBase
|
| + implements Marshallable<InvalidationClientState>, ProtocolListener,
|
| + TestableInvalidationClient {
|
| +
|
| + /**
|
| + * A subclass of {@link RecurringTask} with simplified constructors to provide common
|
| + * parameters automatically (scheduler, logger, smearer).
|
| + */
|
| + private abstract class TiclRecurringTask extends RecurringTask {
|
| + /**
|
| + * Constructs a task with {@code initialDelayMs} and {@code timeoutDelayMs}. If
|
| + * {@code useExponentialBackoff}, an exponential backoff generator with initial delay
|
| + * {@code timeoutDelayMs} is used as well; if not, exponential backoff is not used.
|
| + */
|
| + TiclRecurringTask(String name, int initialDelayMs, int timeoutDelayMs,
|
| + boolean useExponentialBackoff) {
|
| + super(name, internalScheduler, logger, smearer,
|
| + useExponentialBackoff ? createExpBackOffGenerator(timeoutDelayMs, null) : null,
|
| + initialDelayMs, timeoutDelayMs);
|
| + }
|
| +
|
| + /**
|
| + * Constructs an instance from {@code marshalledState} that does not use exponential backoff.
|
| + * @param name name of the recurring task
|
| + */
|
| + private TiclRecurringTask(String name, RecurringTaskState marshalledState) {
|
| + super(name, internalScheduler, logger, smearer, null, marshalledState);
|
| + }
|
| +
|
| + /**
|
| + * Constructs an instance from {@code marshalledState} that uses exponential backoff with an
|
| + * initial backoff of {@code timeoutMs}.
|
| + *
|
| + * @param name name of the recurring task
|
| + */
|
| + private TiclRecurringTask(String name, int timeoutMs, RecurringTaskState marshalledState) {
|
| + super(name, internalScheduler, logger, smearer,
|
| + createExpBackOffGenerator(timeoutMs, marshalledState.getBackoffState()), marshalledState);
|
| + }
|
| + }
|
| +
|
| + /** A task for acquiring tokens from the server. */
|
| + private class AcquireTokenTask extends TiclRecurringTask {
|
| + private static final String TASK_NAME = "AcquireToken";
|
| +
|
| + AcquireTokenTask() {
|
| + super(TASK_NAME, NO_DELAY, config.getNetworkTimeoutDelayMs(), true);
|
| + }
|
| +
|
| + AcquireTokenTask(RecurringTaskState marshalledState) {
|
| + super(TASK_NAME, config.getNetworkTimeoutDelayMs(), marshalledState);
|
| + }
|
| +
|
| + @Override
|
| + public boolean runTask() {
|
| + // If token is still not assigned (as expected), sends a request. Otherwise, ignore.
|
| + if (clientToken == null) {
|
| + // Allocate a nonce and send a message requesting a new token.
|
| + setNonce(generateNonce(random));
|
| + protocolHandler.sendInitializeMessage(applicationClientId, nonce, batchingTask, TASK_NAME);
|
| + return true; // Reschedule to check state, retry if necessary after timeout.
|
| + } else {
|
| + return false; // Don't reschedule.
|
| + }
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * A task that schedules heartbeats when the registration summary at the client is not
|
| + * in sync with the registration summary from the server.
|
| + */
|
| + private class RegSyncHeartbeatTask extends TiclRecurringTask {
|
| + private static final String TASK_NAME = "RegSyncHeartbeat";
|
| +
|
| + RegSyncHeartbeatTask() {
|
| + super(TASK_NAME, config.getNetworkTimeoutDelayMs(), config.getNetworkTimeoutDelayMs(), true);
|
| + }
|
| +
|
| + RegSyncHeartbeatTask(RecurringTaskState marshalledState) {
|
| + super(TASK_NAME, config.getNetworkTimeoutDelayMs(), marshalledState);
|
| + }
|
| +
|
| + @Override
|
| + public boolean runTask() {
|
| + if (!registrationManager.isStateInSyncWithServer()) {
|
| + // Simply send an info message to ensure syncing happens.
|
| + logger.info("Registration state not in sync with server: %s", registrationManager);
|
| + sendInfoMessageToServer(false, true /* request server summary */);
|
| + return true;
|
| + } else {
|
| + logger.info("Not sending message since state is now in sync");
|
| + return false;
|
| + }
|
| + }
|
| + }
|
| +
|
| + /** A task that writes the token to persistent storage. */
|
| + private class PersistentWriteTask extends TiclRecurringTask {
|
| + /*
|
| + * This class implements a "train" of events that attempt to reliably write state to
|
| + * storage. The train continues until runTask encounters a termination condition, in
|
| + * which the state currently in memory and the state currently in storage match.
|
| + */
|
| +
|
| + private static final String TASK_NAME = "PersistentWrite";
|
| +
|
| + /** The last client token that was written to to persistent state successfully. */
|
| + private final Box<PersistentTiclState> lastWrittenState =
|
| + Box.of(PersistentTiclState.DEFAULT_INSTANCE);
|
| +
|
| + PersistentWriteTask() {
|
| + super(TASK_NAME, NO_DELAY, config.getWriteRetryDelayMs(), true);
|
| + }
|
| +
|
| + PersistentWriteTask(RecurringTaskState marshalledState) {
|
| + super(TASK_NAME, config.getWriteRetryDelayMs(), marshalledState);
|
| + }
|
| +
|
| + @Override
|
| + public boolean runTask() {
|
| + if (clientToken == null) {
|
| + // We cannot write without a token. We must do this check before creating the
|
| + // PersistentTiclState because newPersistentTiclState cannot handle null tokens.
|
| + return false;
|
| + }
|
| +
|
| + // Compute the state that we will write if we decide to go ahead with the write.
|
| + final PersistentTiclState state =
|
| + PersistentTiclState.create(clientToken, lastMessageSendTimeMs);
|
| + byte[] serializedState = PersistenceUtils.serializeState(state, digestFn);
|
| +
|
| + // Decide whether or not to do the write. The decision varies depending on whether or
|
| + // not the channel supports offline delivery. If we decide not to do the write, then
|
| + // that means the in-memory and stored state match semantically, and the train stops.
|
| + if (config.getChannelSupportsOfflineDelivery()) {
|
| + // For offline delivery, we want the entire state to match, since we write the last
|
| + // send time for every message.
|
| + if (state.equals(lastWrittenState.get())) {
|
| + return false;
|
| + }
|
| + } else {
|
| + // If we do not support offline delivery, we avoid writing the state on each message, and
|
| + // we avoid checking the last-sent time (we check only the client token).
|
| + if (TypedUtil.<Bytes>equals(
|
| + state.getClientToken(), lastWrittenState.get().getClientToken())) {
|
| + return false;
|
| + }
|
| + }
|
| +
|
| + // We decided to do the write.
|
| + storage.writeKey(CLIENT_TOKEN_KEY, serializedState, new Callback<Status>() {
|
| + @Override
|
| + public void accept(Status status) {
|
| + logger.info("Write state completed: %s for %s", status, state);
|
| + Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
|
| + if (status.isSuccess()) {
|
| + // Set lastWrittenToken to be the token that was written (NOT clientToken - which
|
| + // could have changed while the write was happening).
|
| + lastWrittenState.set(state);
|
| + } else {
|
| + statistics.recordError(ClientErrorType.PERSISTENT_WRITE_FAILURE);
|
| + }
|
| + }
|
| + });
|
| + return true; // Reschedule after timeout to make sure that write does happen.
|
| + }
|
| + }
|
| +
|
| + /** A task for sending heartbeats to the server. */
|
| + private class HeartbeatTask extends TiclRecurringTask {
|
| + private static final String TASK_NAME = "Heartbeat";
|
| +
|
| + /** Next time that the performance counters are sent to the server. */
|
| + private long nextPerformanceSendTimeMs;
|
| +
|
| + HeartbeatTask() {
|
| + super(TASK_NAME, config.getHeartbeatIntervalMs(), NO_DELAY, false);
|
| + }
|
| +
|
| + HeartbeatTask(RecurringTaskState marshalledState) {
|
| + super(TASK_NAME, marshalledState);
|
| + }
|
| +
|
| + @Override
|
| + public boolean runTask() {
|
| + // Send info message. If needed, send performance counters and reset the next performance
|
| + // counter send time.
|
| + logger.info("Sending heartbeat to server: %s", this);
|
| + boolean mustSendPerfCounters =
|
| + nextPerformanceSendTimeMs > internalScheduler.getCurrentTimeMs();
|
| + if (mustSendPerfCounters) {
|
| + this.nextPerformanceSendTimeMs = internalScheduler.getCurrentTimeMs() +
|
| + getSmearer().getSmearedDelay(config.getPerfCounterDelayMs());
|
| + }
|
| + sendInfoMessageToServer(mustSendPerfCounters, !registrationManager.isStateInSyncWithServer());
|
| + return true; // Reschedule.
|
| + }
|
| + }
|
| +
|
| + /** The task that is scheduled to send batched messages to the server (when needed). **/
|
| + static class BatchingTask extends RecurringTask {
|
| + /*
|
| + * This class is static and extends RecurringTask directly so that it can be instantiated
|
| + * independently in ProtocolHandlerTest.
|
| + */
|
| + private static final String TASK_NAME = "Batching";
|
| +
|
| + /** {@link ProtocolHandler} instance from which messages will be pulled. */
|
| + private final ProtocolHandler protocolHandler;
|
| +
|
| + /** Creates a new instance with default state. */
|
| + BatchingTask(ProtocolHandler protocolHandler, SystemResources resources, Smearer smearer,
|
| + int batchingDelayMs) {
|
| + super(TASK_NAME, resources.getInternalScheduler(), resources.getLogger(), smearer, null,
|
| + batchingDelayMs, NO_DELAY);
|
| + this.protocolHandler = protocolHandler;
|
| + }
|
| +
|
| + /** Creates a new instance with state from {@code marshalledState}. */
|
| + BatchingTask(ProtocolHandler protocolHandler, SystemResources resources, Smearer smearer,
|
| + RecurringTaskState marshalledState) {
|
| + super(TASK_NAME, resources.getInternalScheduler(), resources.getLogger(), smearer, null,
|
| + marshalledState);
|
| + this.protocolHandler = protocolHandler;
|
| + }
|
| +
|
| + @Override
|
| + public boolean runTask() {
|
| + protocolHandler.sendMessageToServer();
|
| + return false; // Don't reschedule.
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * A (slightly strange) recurring task that executes exactly once for the first heartbeat
|
| + * performed by a Ticl restarting from persistent state. The Android Ticl implementation
|
| + * requires that all work to be scheduled in the future occur in the form of a recurring task,
|
| + * hence this class.
|
| + */
|
| + private class InitialPersistentHeartbeatTask extends TiclRecurringTask {
|
| + private static final String TASK_NAME = "InitialPersistentHeartbeat";
|
| +
|
| + InitialPersistentHeartbeatTask(int delayMs) {
|
| + super(TASK_NAME, delayMs, NO_DELAY, false);
|
| + }
|
| +
|
| + @Override
|
| + public boolean runTask() {
|
| + sendInfoMessageToServer(false, true);
|
| + return false; // Don't reschedule.
|
| + }
|
| + }
|
| +
|
| + //
|
| + // End of nested classes.
|
| + //
|
| +
|
| + /** The single key used to write all the Ticl state. */
|
| + public static final String CLIENT_TOKEN_KEY = "ClientToken";
|
| +
|
| + /** Resources for the Ticl. */
|
| + private final SystemResources resources;
|
| +
|
| + /**
|
| + * Reference into the resources object for cleaner code. All Ticl code must execute on this
|
| + * scheduler.
|
| + */
|
| + private final Scheduler internalScheduler;
|
| +
|
| + /** Logger reference into the resources object for cleaner code. */
|
| + private final Logger logger;
|
| +
|
| + /** Storage for the Ticl persistent state. */
|
| + Storage storage;
|
| +
|
| + /** Application callback interface. */
|
| + final InvalidationListener listener;
|
| +
|
| + /** Configuration for this instance. */
|
| + private ClientConfigP config;
|
| +
|
| + /** Application identifier for this client. */
|
| + private final ApplicationClientIdP applicationClientId;
|
| +
|
| + /** Object maintaining the registration state for this client. */
|
| + private final RegistrationManager registrationManager;
|
| +
|
| + /** Object handling low-level wire format interactions. */
|
| + private final ProtocolHandler protocolHandler;
|
| +
|
| + /** The function for computing the registration and persistence state digests. */
|
| + private final DigestFunction digestFn = new ObjectIdDigestUtils.Sha1DigestFunction();
|
| +
|
| + /** The state of the Ticl whether it has started or not. */
|
| + private final RunState ticlState;
|
| +
|
| + /** Statistics objects to track number of sent messages, etc. */
|
| + final Statistics statistics;
|
| +
|
| + /** A smearer to make sure that delays are randomized a little bit. */
|
| + private final Smearer smearer;
|
| +
|
| + /** Current client token known from the server. */
|
| + private Bytes clientToken = null;
|
| +
|
| + // After the client starts, exactly one of nonce and clientToken is non-null.
|
| +
|
| + /** If not {@code null}, nonce for pending identifier request. */
|
| + private Bytes nonce = null;
|
| +
|
| + /** Whether we should send registrations to the server or not. */
|
| + private boolean shouldSendRegistrations;
|
| +
|
| + /** Whether the network is online. Assume so when we start. */
|
| + private boolean isOnline = true;
|
| +
|
| + /** A random number generator. */
|
| + private final Random random;
|
| +
|
| + /** Last time a message was sent to the server. */
|
| + private long lastMessageSendTimeMs = 0;
|
| +
|
| + /** A task for acquiring the token (if the client has no token). */
|
| + private AcquireTokenTask acquireTokenTask;
|
| +
|
| + /** Task for checking if reg summary is out of sync and then sending a heartbeat to the server. */
|
| + private RegSyncHeartbeatTask regSyncHeartbeatTask;
|
| +
|
| + /** Task for writing the state blob to persistent storage. */
|
| + private PersistentWriteTask persistentWriteTask;
|
| +
|
| + /** A task for periodic heartbeats. */
|
| + private HeartbeatTask heartbeatTask;
|
| +
|
| + /** Task to send all batched messages to the server. */
|
| + private BatchingTask batchingTask;
|
| +
|
| + /** Task to do the first heartbeat after a persistent restart. */
|
| + private InitialPersistentHeartbeatTask initialPersistentHeartbeatTask;
|
| +
|
| + /** A cache of already acked invalidations to avoid duplicate delivery. */
|
| + private final AckCache ackCache = new AckCache();
|
| +
|
| + /**
|
| + * Constructs a client.
|
| + *
|
| + * @param resources resources to use during execution
|
| + * @param random a random number generator
|
| + * @param clientType client type code
|
| + * @param clientName application identifier for the client
|
| + * @param config configuration for the client
|
| + * @param applicationName name of the application using the library (for debugging/monitoring)
|
| + * @param regManagerState marshalled registration manager state, if any
|
| + * @param protocolHandlerState marshalled protocol handler state, if any
|
| + * @param listener application callback
|
| + */
|
| + private InvalidationClientCore(final SystemResources resources, Random random, int clientType,
|
| + final byte[] clientName, ClientConfigP config, String applicationName,
|
| + RunStateP ticlRunState,
|
| + RegistrationManagerStateP regManagerState,
|
| + ProtocolHandlerState protocolHandlerState,
|
| + StatisticsState statisticsState,
|
| + InvalidationListener listener) {
|
| + this.resources = Preconditions.checkNotNull(resources);
|
| + this.random = random;
|
| + this.logger = Preconditions.checkNotNull(resources.getLogger());
|
| + this.internalScheduler = resources.getInternalScheduler();
|
| + this.storage = resources.getStorage();
|
| + this.config = config;
|
| + this.ticlState = (ticlRunState == null) ? new RunState() : new RunState(ticlRunState);
|
| + this.smearer = new Smearer(random, this.config.getSmearPercent());
|
| + this.applicationClientId = ApplicationClientIdP.create(clientType, new Bytes(clientName));
|
| + this.listener = listener;
|
| + this.statistics = (statisticsState != null)
|
| + ? Statistics.deserializeStatistics(resources.getLogger(), statisticsState.getCounter())
|
| + : new Statistics();
|
| + this.registrationManager = new RegistrationManager(logger, statistics, digestFn,
|
| + regManagerState);
|
| + this.protocolHandler = new ProtocolHandler(config.getProtocolHandlerConfig(), resources,
|
| + smearer, statistics, clientType, applicationName, this, protocolHandlerState);
|
| + }
|
| +
|
| + /**
|
| + * Constructs a client with default state.
|
| + *
|
| + * @param resources resources to use during execution
|
| + * @param random a random number generator
|
| + * @param clientType client type code
|
| + * @param clientName application identifier for the client
|
| + * @param config configuration for the client
|
| + * @param applicationName name of the application using the library (for debugging/monitoring)
|
| + * @param listener application callback
|
| + */
|
| + public InvalidationClientCore(final SystemResources resources, Random random, int clientType,
|
| + final byte[] clientName, ClientConfigP config, String applicationName,
|
| + InvalidationListener listener) {
|
| + this(resources, random, clientType, clientName, config, applicationName, null, null, null, null,
|
| + listener);
|
| + createSchedulingTasks(null);
|
| + registerWithNetwork(resources);
|
| + logger.info("Created client: %s", this);
|
| + }
|
| +
|
| + /**
|
| + * Constructs a client with state initialized from {@code marshalledState}.
|
| + *
|
| + * @param resources resources to use during execution
|
| + * @param random a random number generator
|
| + * @param clientType client type code
|
| + * @param clientName application identifier for the client
|
| + * @param config configuration for the client
|
| + * @param applicationName name of the application using the library (for debugging/monitoring)
|
| + * @param listener application callback
|
| + */
|
| + public InvalidationClientCore(final SystemResources resources, Random random, int clientType,
|
| + final byte[] clientName, ClientConfigP config, String applicationName,
|
| + InvalidationClientState marshalledState, InvalidationListener listener) {
|
| + this(resources, random, clientType, clientName, config, applicationName,
|
| + marshalledState.getRunState(), marshalledState.getRegistrationManagerState(),
|
| + marshalledState.getProtocolHandlerState(), marshalledState.getStatisticsState(), listener);
|
| + // Unmarshall.
|
| + if (marshalledState.hasClientToken()) {
|
| + clientToken = marshalledState.getClientToken();
|
| + }
|
| + if (marshalledState.hasNonce()) {
|
| + nonce = marshalledState.getNonce();
|
| + }
|
| + this.shouldSendRegistrations = marshalledState.getShouldSendRegistrations();
|
| + this.lastMessageSendTimeMs = marshalledState.getLastMessageSendTimeMs();
|
| + this.isOnline = marshalledState.getIsOnline();
|
| + createSchedulingTasks(marshalledState);
|
| +
|
| + // We register with the network after unmarshalling our isOnline value. This is because when
|
| + // we register with the network, it may give us a new value for isOnline. If we unmarshalled
|
| + // after registering, then we would clobber the new value with the old marshalled value, which
|
| + // is wrong.
|
| + registerWithNetwork(resources);
|
| + logger.info("Created client: %s", this);
|
| + }
|
| +
|
| + /**
|
| + * Registers handlers for received messages and network status changes with the network of
|
| + * {@code resources}.
|
| + */
|
| + private void registerWithNetwork(final SystemResources resources) {
|
| + resources.getNetwork().setListener(new NetworkChannel.NetworkListener() {
|
| + @Override
|
| + public void onMessageReceived(byte[] incomingMessage) {
|
| + InvalidationClientCore.this.handleIncomingMessage(incomingMessage);
|
| + }
|
| + @Override
|
| + public void onOnlineStatusChange(boolean isOnline) {
|
| + InvalidationClientCore.this.handleNetworkStatusChange(isOnline);
|
| + }
|
| + @Override
|
| + public void onAddressChange() {
|
| + // Send a message to the server. The header will include the new network address.
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + sendInfoMessageToServer(false, false);
|
| + }
|
| + });
|
| + }
|
| +
|
| + /** Returns a default config builder for the client. */
|
| + public static ClientConfigP createConfig() {
|
| + Version version =
|
| + Version.create(ClientConstants.CONFIG_MAJOR_VERSION, ClientConstants.CONFIG_MINOR_VERSION);
|
| + ProtocolHandlerConfigP protocolHandlerConfig = ProtocolHandler.createConfig();
|
| + ClientConfigP.Builder builder = new ClientConfigP.Builder(version, protocolHandlerConfig);
|
| + return builder.build();
|
| + }
|
| +
|
| + /** Returns a configuration builder with parameters set for unit tests. */
|
| + public static ClientConfigP createConfigForTest() {
|
| + Version version =
|
| + Version.create(ClientConstants.CONFIG_MAJOR_VERSION, ClientConstants.CONFIG_MINOR_VERSION);
|
| + ProtocolHandlerConfigP protocolHandlerConfig = ProtocolHandler.createConfigForTest();
|
| + ClientConfigP.Builder builder = new ClientConfigP.Builder(version, protocolHandlerConfig);
|
| + builder.networkTimeoutDelayMs = 2 * 1000;
|
| + builder.heartbeatIntervalMs = 5 * 1000;
|
| + builder.writeRetryDelayMs = 500;
|
| + return builder.build();
|
| + }
|
| +
|
| + /**
|
| + * Creates the tasks used by the Ticl for token acquisition, heartbeats, persistent writes and
|
| + * registration sync.
|
| + *
|
| + * @param marshalledState saved state of recurring tasks
|
| + */
|
| + private void createSchedulingTasks(InvalidationClientState marshalledState) {
|
| + if (marshalledState == null) {
|
| + this.acquireTokenTask = new AcquireTokenTask();
|
| + this.heartbeatTask = new HeartbeatTask();
|
| + this.regSyncHeartbeatTask = new RegSyncHeartbeatTask();
|
| + this.persistentWriteTask = new PersistentWriteTask();
|
| + this.batchingTask = new BatchingTask(protocolHandler, resources, smearer,
|
| + config.getProtocolHandlerConfig().getBatchingDelayMs());
|
| + } else {
|
| + this.acquireTokenTask = new AcquireTokenTask(marshalledState.getAcquireTokenTaskState());
|
| + this.heartbeatTask = new HeartbeatTask(marshalledState.getHeartbeatTaskState());
|
| + this.regSyncHeartbeatTask =
|
| + new RegSyncHeartbeatTask(marshalledState.getRegSyncHeartbeatTaskState());
|
| + this.persistentWriteTask =
|
| + new PersistentWriteTask(marshalledState.getPersistentWriteTaskState());
|
| + this.batchingTask = new BatchingTask(protocolHandler, resources, smearer,
|
| + marshalledState.getBatchingTaskState());
|
| + if (marshalledState.hasLastWrittenState()) {
|
| + persistentWriteTask.lastWrittenState.set(marshalledState.getLastWrittenState());
|
| + }
|
| + }
|
| + // The handling of new InitialPersistentHeartbeatTask is a little strange. We create one when
|
| + // the Ticl is first created so that it can be called by the scheduler if it had been scheduled
|
| + // in the past. Otherwise, when we are ready to schedule one ourselves, we create a new instance
|
| + // with the proper delay, then schedule it. We have to do this because we don't know what delay
|
| + // to use here, since we don't compute it until start().
|
| + this.initialPersistentHeartbeatTask = new InitialPersistentHeartbeatTask(0);
|
| + }
|
| +
|
| + /** Returns the configuration used by the client. */
|
| + protected ClientConfigP getConfig() {
|
| + return config;
|
| + }
|
| +
|
| + // Methods for TestableInvalidationClient.
|
| +
|
| + @Override
|
| +
|
| + public ClientConfigP getConfigForTest() {
|
| + return getConfig();
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public byte[] getApplicationClientIdForTest() {
|
| + return applicationClientId.toByteArray();
|
| + }
|
| +
|
| + /** Returns the application client id of this client. */
|
| + protected ApplicationClientIdP getApplicationClientIdP() {
|
| + return applicationClientId;
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public InvalidationListener getInvalidationListenerForTest() {
|
| + return (listener instanceof CheckingInvalidationListener) ?
|
| + ((CheckingInvalidationListener) this.listener).getDelegate() : this.listener;
|
| + }
|
| +
|
| +
|
| + public SystemResources getResourcesForTest() {
|
| + return resources;
|
| + }
|
| +
|
| + public SystemResources getResources() {
|
| + return resources;
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public Statistics getStatisticsForTest() {
|
| + return statistics;
|
| + }
|
| +
|
| + Statistics getStatistics() {
|
| + return statistics;
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public DigestFunction getDigestFunctionForTest() {
|
| + return this.digestFn;
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public long getNextMessageSendTimeMsForTest() {
|
| + Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
|
| + return protocolHandler.getNextMessageSendTimeMsForTest();
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public RegistrationManagerState getRegistrationManagerStateCopyForTest() {
|
| + Preconditions.checkState(resources.getInternalScheduler().isRunningOnThread());
|
| + return registrationManager.getRegistrationManagerStateCopyForTest();
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public void changeNetworkTimeoutDelayForTest(int networkTimeoutDelayMs) {
|
| + ClientConfigP.Builder builder = config.toBuilder();
|
| + builder.networkTimeoutDelayMs = networkTimeoutDelayMs;
|
| + config = builder.build();
|
| + createSchedulingTasks(null);
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public void changeHeartbeatDelayForTest(int heartbeatDelayMs) {
|
| + ClientConfigP.Builder builder = config.toBuilder();
|
| + builder.heartbeatIntervalMs = heartbeatDelayMs;
|
| + config = builder.build();
|
| + createSchedulingTasks(null);
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public void setDigestStoreForTest(DigestStore<ObjectIdP> digestStore) {
|
| + Preconditions.checkState(!resources.isStarted());
|
| + registrationManager.setDigestStoreForTest(digestStore);
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public Bytes getClientTokenForTest() {
|
| + return getClientToken();
|
| + }
|
| +
|
| + @Override
|
| +
|
| + public String getClientTokenKeyForTest() {
|
| + return CLIENT_TOKEN_KEY;
|
| + }
|
| +
|
| + @Override
|
| + public boolean isStartedForTest() {
|
| + return isStarted();
|
| + }
|
| +
|
| + /**
|
| + * Returns whether the Ticl is started, i.e., whether it at some point had a session with the
|
| + * data center after being constructed.
|
| + */
|
| + protected boolean isStarted() {
|
| + return ticlState.isStarted();
|
| + }
|
| +
|
| + @Override
|
| + public void stopResources() {
|
| + resources.stop();
|
| + }
|
| +
|
| + @Override
|
| + public long getResourcesTimeMs() {
|
| + return resources.getInternalScheduler().getCurrentTimeMs();
|
| + }
|
| +
|
| + @Override
|
| + public Scheduler getInternalSchedulerForTest() {
|
| + return resources.getInternalScheduler();
|
| + }
|
| +
|
| + @Override
|
| + public Storage getStorage() {
|
| + return storage;
|
| + }
|
| +
|
| + @Override
|
| + public NetworkEndpointId getNetworkIdForTest() {
|
| + NetworkChannel network = resources.getNetwork();
|
| + if (!(network instanceof TestableNetworkChannel)) {
|
| + throw new UnsupportedOperationException(
|
| + "getNetworkIdForTest requires a TestableNetworkChannel, not: " + network.getClass());
|
| + }
|
| + return ((TestableNetworkChannel) network).getNetworkIdForTest();
|
| + }
|
| +
|
| + // End of methods for TestableInvalidationClient
|
| +
|
| + @Override // InvalidationClient
|
| + public void start() {
|
| + Preconditions.checkState(resources.isStarted(), "Resources must be started before starting " +
|
| + "the Ticl");
|
| + if (ticlState.isStarted()) {
|
| + logger.severe("Ignoring start call since already started: client = %s", this);
|
| + return;
|
| + }
|
| +
|
| + // Initialize the nonce so that we can maintain the invariant that exactly one of
|
| + // "nonce" and "clientToken" is non-null.
|
| + setNonce(generateNonce(random));
|
| +
|
| + logger.info("Starting with Java config: %s", config);
|
| + // Read the state blob and then schedule startInternal once the value is there.
|
| + scheduleStartAfterReadingStateBlob();
|
| + }
|
| +
|
| + /**
|
| + * Implementation of {@link #start} on the internal thread with the persistent
|
| + * {@code serializedState} if any. Starts the TICL protocol and makes the TICL ready to receive
|
| + * registrations, invalidations, etc.
|
| + */
|
| + private void startInternal(byte[] serializedState) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| +
|
| + // Initialize the session manager using the persisted client token.
|
| + PersistentTiclState persistentState =
|
| + (serializedState == null) ? null : PersistenceUtils.deserializeState(logger,
|
| + serializedState, digestFn);
|
| +
|
| + if ((serializedState != null) && (persistentState == null)) {
|
| + // In this case, we'll proceed as if we had no persistent state -- i.e., obtain a new client
|
| + // id from the server.
|
| + statistics.recordError(ClientErrorType.PERSISTENT_DESERIALIZATION_FAILURE);
|
| + logger.severe("Failed deserializing persistent state: %s",
|
| + Bytes.toLazyCompactString(serializedState));
|
| + }
|
| + if (persistentState != null) {
|
| + // If we have persistent state, use the previously-stored token and send a heartbeat to
|
| + // let the server know that we've restarted, since we may have been marked offline.
|
| +
|
| + // In the common case, the server will already have all of our
|
| + // registrations, but we won't know for sure until we've gotten its summary.
|
| + // We'll ask the application for all of its registrations, but to avoid
|
| + // making the registrar redo the work of performing registrations that
|
| + // probably already exist, we'll suppress sending them to the registrar.
|
| + logger.info("Restarting from persistent state: %s", persistentState.getClientToken());
|
| + setNonce(null);
|
| + setClientToken(persistentState.getClientToken());
|
| + shouldSendRegistrations = false;
|
| +
|
| + // Schedule an info message for the near future.
|
| + int initialHeartbeatDelayMs = computeInitialPersistentHeartbeatDelayMs(
|
| + config, resources, persistentState.getLastMessageSendTimeMs());
|
| + initialPersistentHeartbeatTask = new InitialPersistentHeartbeatTask(initialHeartbeatDelayMs);
|
| + initialPersistentHeartbeatTask.ensureScheduled("");
|
| +
|
| + // We need to ensure that heartbeats are sent, regardless of whether we start fresh or from
|
| + // persistent state. The line below ensures that they are scheduled in the persistent startup
|
| + // case. For the other case, the task is scheduled when we acquire a token.
|
| + heartbeatTask.ensureScheduled("Startup-after-persistence");
|
| + } else {
|
| + // If we had no persistent state or couldn't deserialize the state that we had, start fresh.
|
| + // Request a new client identifier.
|
| +
|
| + // The server can't possibly have our registrations, so whatever we get
|
| + // from the application we should send to the registrar.
|
| + logger.info("Starting with no previous state");
|
| + shouldSendRegistrations = true;
|
| + acquireToken("Startup");
|
| + }
|
| +
|
| + // listener.ready() is called when ticl has acquired a new token.
|
| + }
|
| +
|
| + /**
|
| + * Returns the delay for the initial heartbeat, given that the last message to the server was
|
| + * sent at {@code lastSendTimeMs}.
|
| + * @param config configuration object used by the client
|
| + * @param resources resources used by the client
|
| + */
|
| +
|
| + static int computeInitialPersistentHeartbeatDelayMs(ClientConfigP config,
|
| + SystemResources resources, long lastSendTimeMs) {
|
| + // There are five cases:
|
| + // 1. Channel does not support offline delivery. We delay a little bit to allow the
|
| + // application to reissue its registrations locally and avoid triggering registration
|
| + // sync with the data center due to a hash mismatch. This is the "minimum delay," and we
|
| + // never use a delay less than it.
|
| + //
|
| + // All other cases are for channels supporting offline delivery.
|
| + //
|
| + // 2. Last send time is in the future (something weird happened). Use the minimum delay.
|
| + // 3. We have been asleep for more than one heartbeat interval. Use the minimum delay.
|
| + // 4. We have been asleep for less than one heartbeat interval.
|
| + // (a). The time remaining to the end of the interval is less than the minimum delay.
|
| + // Use the minimum delay.
|
| + // (b). The time remaining to the end of the interval is more than the minimum delay.
|
| + // Use the remaining delay.
|
| + final long nowMs = resources.getInternalScheduler().getCurrentTimeMs();
|
| + final int initialHeartbeatDelayMs;
|
| + if (!config.getChannelSupportsOfflineDelivery()) {
|
| + // Case 1.
|
| + initialHeartbeatDelayMs = config.getInitialPersistentHeartbeatDelayMs();
|
| + } else {
|
| + // Offline delivery cases (2, 3, 4).
|
| + // The default of the last send time is zero, so even if it wasn't written in the persistent
|
| + // state, this logic is still correct.
|
| + if ((lastSendTimeMs > nowMs) || // Case 2.
|
| + ((lastSendTimeMs + config.getHeartbeatIntervalMs()) < nowMs)) { // Case 3.
|
| + // Either something strange happened and the last send time is in the future, or we
|
| + // have been asleep for more than one heartbeat interval. Send immediately.
|
| + initialHeartbeatDelayMs = config.getInitialPersistentHeartbeatDelayMs();
|
| + } else {
|
| + // Case 4.
|
| + // We have been asleep for less than one heartbeat interval. Send after it expires,
|
| + // but ensure we let the initial heartbeat interval elapse.
|
| + final long timeSinceLastMessageMs = nowMs - lastSendTimeMs;
|
| + final int remainingHeartbeatIntervalMs =
|
| + (int) (config.getHeartbeatIntervalMs() - timeSinceLastMessageMs);
|
| + initialHeartbeatDelayMs = Math.max(remainingHeartbeatIntervalMs,
|
| + config.getInitialPersistentHeartbeatDelayMs());
|
| + }
|
| + }
|
| + resources.getLogger().info("Computed heartbeat delay %s from: offline-delivery = %s, "
|
| + + "initial-persistent-delay = %s, heartbeat-interval = %s, nowMs = %s",
|
| + initialHeartbeatDelayMs, config.getChannelSupportsOfflineDelivery(),
|
| + config.getInitialPersistentHeartbeatDelayMs(), config.getHeartbeatIntervalMs(),
|
| + nowMs);
|
| + return initialHeartbeatDelayMs;
|
| + }
|
| +
|
| + @Override // InvalidationClient
|
| + public void stop() {
|
| + logger.warning("Ticl being stopped: %s", InvalidationClientCore.this);
|
| + if (ticlState.isStarted()) { // RunState is thread-safe.
|
| + ticlState.stop();
|
| + }
|
| + }
|
| +
|
| + @Override // InvalidationClient
|
| + public void register(ObjectId objectId) {
|
| + List<ObjectId> objectIds = new ArrayList<ObjectId>();
|
| + objectIds.add(objectId);
|
| + performRegisterOperations(objectIds, RegistrationP.OpType.REGISTER);
|
| + }
|
| +
|
| + @Override // InvalidationClient
|
| + public void unregister(ObjectId objectId) {
|
| + List<ObjectId> objectIds = new ArrayList<ObjectId>();
|
| + objectIds.add(objectId);
|
| + performRegisterOperations(objectIds, RegistrationP.OpType.UNREGISTER);
|
| + }
|
| +
|
| + @Override // InvalidationClient
|
| + public void register(Collection<ObjectId> objectIds) {
|
| + performRegisterOperations(objectIds, RegistrationP.OpType.REGISTER);
|
| + }
|
| +
|
| + @Override // InvalidationClient
|
| + public void unregister(Collection<ObjectId> objectIds) {
|
| + performRegisterOperations(objectIds, RegistrationP.OpType.UNREGISTER);
|
| + }
|
| +
|
| + /**
|
| + * Implementation of (un)registration.
|
| + *
|
| + * @param objectIds object ids on which to operate
|
| + * @param regOpType whether to register or unregister
|
| + */
|
| + private void performRegisterOperations(final Collection<ObjectId> objectIds,
|
| + final int regOpType) {
|
| + Preconditions.checkState(!objectIds.isEmpty(), "Must specify some object id");
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(),
|
| + "Not running on internal thread");
|
| +
|
| + if (ticlState.isStopped()) {
|
| + // The Ticl has been stopped. This might be some old registration op coming in. Just ignore
|
| + // instead of crashing.
|
| + logger.severe("Ticl stopped: register (%s) of %s ignored.", regOpType, objectIds);
|
| + return;
|
| + }
|
| + if (!ticlState.isStarted()) {
|
| + // We must be in the NOT_STARTED state, since we can't be in STOPPED or STARTED (since the
|
| + // previous if-check didn't succeeded, and isStarted uses a != STARTED test).
|
| + logger.severe(
|
| + "Ticl is not yet started; failing registration call; client = %s, objects = %s, op = %s",
|
| + this, objectIds, regOpType);
|
| + for (ObjectId objectId : objectIds) {
|
| + listener.informRegistrationFailure(this, objectId, true, "Client not yet ready");
|
| + }
|
| + return;
|
| + }
|
| +
|
| + List<ObjectIdP> objectIdProtos = new ArrayList<ObjectIdP>(objectIds.size());
|
| + for (ObjectId objectId : objectIds) {
|
| + Preconditions.checkNotNull(objectId, "Must specify object id");
|
| + ObjectIdP objectIdProto = ProtoWrapperConverter.convertToObjectIdProto(objectId);
|
| + IncomingOperationType opType = (regOpType == RegistrationP.OpType.REGISTER) ?
|
| + IncomingOperationType.REGISTRATION : IncomingOperationType.UNREGISTRATION;
|
| + statistics.recordIncomingOperation(opType);
|
| + logger.info("Register %s, %s", objectIdProto, regOpType);
|
| + objectIdProtos.add(objectIdProto);
|
| + }
|
| +
|
| + // Update the registration manager state, then have the protocol client send a message.
|
| + // performOperations returns only those elements of objectIdProtos that caused a state
|
| + // change (i.e., elements not present if regOpType == REGISTER or elements that were present
|
| + // if regOpType == UNREGISTER).
|
| + Collection<ObjectIdP> objectProtosToSend = registrationManager.performOperations(
|
| + objectIdProtos, regOpType);
|
| +
|
| + // Check whether we should suppress sending registrations because we don't
|
| + // yet know the server's summary.
|
| + if (shouldSendRegistrations && (!objectProtosToSend.isEmpty())) {
|
| + protocolHandler.sendRegistrations(objectProtosToSend, regOpType, batchingTask);
|
| + }
|
| + InvalidationClientCore.this.regSyncHeartbeatTask.ensureScheduled("performRegister");
|
| + }
|
| +
|
| + @Override // InvalidationClient
|
| + public void acknowledge(final AckHandle acknowledgeHandle) {
|
| + Preconditions.checkNotNull(acknowledgeHandle);
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(),
|
| + "Not running on internal thread");
|
| +
|
| + // Parse and validate the ack handle first.
|
| + AckHandleP ackHandle;
|
| + try {
|
| + ackHandle = AckHandleP.parseFrom(acknowledgeHandle.getHandleData());
|
| + } catch (ValidationException exception) {
|
| + logger.warning("Bad ack handle : %s",
|
| + Bytes.toLazyCompactString(acknowledgeHandle.getHandleData()));
|
| + statistics.recordError(ClientErrorType.ACKNOWLEDGE_HANDLE_FAILURE);
|
| + return;
|
| + }
|
| +
|
| + // Currently, only invalidations have non-trivial ack handle.
|
| + InvalidationP invalidation = ackHandle.getNullableInvalidation();
|
| + if (invalidation == null) {
|
| + logger.warning("Ack handle without invalidation : %s",
|
| + Bytes.toLazyCompactString(acknowledgeHandle.getHandleData()));
|
| + statistics.recordError(ClientErrorType.ACKNOWLEDGE_HANDLE_FAILURE);
|
| + return;
|
| + }
|
| +
|
| + // Don't send the payload back.
|
| + if (invalidation.hasPayload()) {
|
| + InvalidationP.Builder builder = invalidation.toBuilder();
|
| + builder.payload = null;
|
| + invalidation = builder.build();
|
| + }
|
| + statistics.recordIncomingOperation(IncomingOperationType.ACKNOWLEDGE);
|
| + protocolHandler.sendInvalidationAck(invalidation, batchingTask);
|
| +
|
| + // Record that the invalidation has been acknowledged to potentially avoid unnecessary delivery
|
| + // of earlier invalidations for the same object.
|
| + ackCache.recordAck(invalidation);
|
| + }
|
| +
|
| + //
|
| + // Protocol listener methods
|
| + //
|
| +
|
| + @Override
|
| + public Bytes getClientToken() {
|
| + Preconditions.checkState((clientToken == null) || (nonce == null));
|
| + return clientToken;
|
| + }
|
| +
|
| + @Override
|
| + public void handleMessageSent() {
|
| + // The ProtocolHandler just sent a message to the server. If the channel supports offline
|
| + // delivery (see the comment in the ClientConfigP), store this time to stable storage. This
|
| + // only needs to be a best-effort write; if it fails, then we will "forget" that we sent the
|
| + // message and heartbeat needlessly when next restarted. That is a performance/battery bug,
|
| + // not a correctness bug.
|
| + lastMessageSendTimeMs = getResourcesTimeMs();
|
| + if (config.getChannelSupportsOfflineDelivery()) {
|
| + // Write whether or not we have a token. The persistent write task is a no-op if there is
|
| + // no token. We only write if the channel supports offline delivery. We could do the write
|
| + // regardless, and may want to do so in the future, since it might simplify some of the
|
| + // Ticl implementation.
|
| + persistentWriteTask.ensureScheduled("sent-message");
|
| + }
|
| + }
|
| +
|
| + @Override
|
| + public RegistrationSummary getRegistrationSummary() {
|
| + return registrationManager.getRegistrationSummary();
|
| + }
|
| +
|
| + //
|
| + // Private methods and toString.
|
| + //
|
| +
|
| + void handleNetworkStatusChange(final boolean isOnline) {
|
| + // If we're back online and haven't sent a message to the server in a while, send a heartbeat to
|
| + // make sure the server knows we're online.
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + boolean wasOnline = this.isOnline;
|
| + this.isOnline = isOnline;
|
| + if (isOnline && !wasOnline && (internalScheduler.getCurrentTimeMs() >
|
| + lastMessageSendTimeMs + config.getOfflineHeartbeatThresholdMs())) {
|
| + logger.log(Level.INFO,
|
| + "Sending heartbeat after reconnection, previous send was %s ms ago",
|
| + internalScheduler.getCurrentTimeMs() - lastMessageSendTimeMs);
|
| + sendInfoMessageToServer(false, !registrationManager.isStateInSyncWithServer());
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Handles an {@code incomingMessage} from the data center. If it is valid and addressed to
|
| + * this client, dispatches to methods to handle sub-parts of the message; if not, drops the
|
| + * message.
|
| + */
|
| + void handleIncomingMessage(byte[] incomingMessage) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + statistics.recordReceivedMessage(ReceivedMessageType.TOTAL);
|
| + ParsedMessage parsedMessage = protocolHandler.handleIncomingMessage(incomingMessage);
|
| + if (parsedMessage == null) {
|
| + // Invalid message.
|
| + return;
|
| + }
|
| +
|
| + // Ensure we have either a matching token or a matching nonce.
|
| + if (!validateToken(parsedMessage)) {
|
| + return;
|
| + }
|
| +
|
| + // Handle a token-control message, if present.
|
| + if (parsedMessage.tokenControlMessage != null) {
|
| + statistics.recordReceivedMessage(ReceivedMessageType.TOKEN_CONTROL);
|
| + handleTokenChanged(parsedMessage.header.token,
|
| + parsedMessage.tokenControlMessage.hasNewToken() ?
|
| + parsedMessage.tokenControlMessage.getNewToken() : null);
|
| + }
|
| +
|
| + // We might have lost our token or failed to acquire one. Ensure that we do not proceed in
|
| + // either case.
|
| + if (clientToken == null) {
|
| + return;
|
| + }
|
| +
|
| + // First, handle the message header.
|
| + handleIncomingHeader(parsedMessage.header);
|
| +
|
| + // Then, handle any work remaining in the message.
|
| + if (parsedMessage.invalidationMessage != null) {
|
| + statistics.recordReceivedMessage(ReceivedMessageType.INVALIDATION);
|
| + handleInvalidations(parsedMessage.invalidationMessage.getInvalidation());
|
| + }
|
| + if (parsedMessage.registrationStatusMessage != null) {
|
| + statistics.recordReceivedMessage(ReceivedMessageType.REGISTRATION_STATUS);
|
| + handleRegistrationStatus(parsedMessage.registrationStatusMessage.getRegistrationStatus());
|
| + }
|
| + if (parsedMessage.registrationSyncRequestMessage != null) {
|
| + statistics.recordReceivedMessage(ReceivedMessageType.REGISTRATION_SYNC_REQUEST);
|
| + handleRegistrationSyncRequest();
|
| + }
|
| + if (parsedMessage.infoRequestMessage != null) {
|
| + statistics.recordReceivedMessage(ReceivedMessageType.INFO_REQUEST);
|
| + handleInfoMessage(parsedMessage.infoRequestMessage.getInfoType());
|
| + }
|
| + if (parsedMessage.errorMessage != null) {
|
| + statistics.recordReceivedMessage(ReceivedMessageType.ERROR);
|
| + handleErrorMessage(parsedMessage.header, parsedMessage.errorMessage.getCode(),
|
| + parsedMessage.errorMessage.getDescription());
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Handles a token-control message.
|
| + * @param headerToken token in the server message
|
| + * @param newToken the new token provided, or {@code null} if this is a destroy message.
|
| + */
|
| + private void handleTokenChanged(Bytes headerToken, final Bytes newToken) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| +
|
| + // The server is either supplying a new token in response to an InitializeMessage, spontaneously
|
| + // destroying a token we hold, or spontaneously upgrading a token we hold.
|
| +
|
| + if (newToken != null) {
|
| + // Note: headerToken cannot be null, so a null nonce or clientToken will always be non-equal.
|
| + boolean headerTokenMatchesNonce = TypedUtil.<Bytes>equals(headerToken, nonce);
|
| + boolean headerTokenMatchesExistingToken = TypedUtil.<Bytes>equals(headerToken, clientToken);
|
| + boolean shouldAcceptToken = headerTokenMatchesNonce || headerTokenMatchesExistingToken;
|
| + if (!shouldAcceptToken) {
|
| + logger.info("Ignoring new token; %s does not match nonce = %s or existing token = %s",
|
| + newToken, nonce, clientToken);
|
| + return;
|
| + }
|
| + logger.info("New token being assigned at client: %s, Old = %s", newToken, clientToken);
|
| +
|
| + // Start the regular heartbeats now.
|
| + heartbeatTask.ensureScheduled("Heartbeat-after-new-token");
|
| + setNonce(null);
|
| + setClientToken(newToken);
|
| + persistentWriteTask.ensureScheduled("Write-after-new-token");
|
| + } else {
|
| + logger.info("Destroying existing token: %s", clientToken);
|
| + acquireToken("Destroy");
|
| + }
|
| + }
|
| +
|
| + /** Handles a server {@code header}. */
|
| + private void handleIncomingHeader(ServerMessageHeader header) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + if (nonce != null) {
|
| + throw new IllegalStateException(
|
| + "Cannot process server header with non-null nonce (have " + nonce + "): " + header);
|
| + }
|
| + if (header.registrationSummary != null) {
|
| + // We've received a summary from the server, so if we were suppressing registrations, we
|
| + // should now allow them to go to the registrar.
|
| + shouldSendRegistrations = true;
|
| +
|
| + // Pass the registration summary to the registration manager. If we are now in agreement
|
| + // with the server and we had any pending operations, we can tell the listener that those
|
| + // operations have succeeded.
|
| + Set<RegistrationP> upcalls =
|
| + registrationManager.informServerRegistrationSummary(header.registrationSummary);
|
| + logger.fine("Received new server registration summary (%s); will make %s upcalls",
|
| + header.registrationSummary, upcalls.size());
|
| + for (RegistrationP registration : upcalls) {
|
| + ObjectId objectId =
|
| + ProtoWrapperConverter.convertFromObjectIdProto(registration.getObjectId());
|
| + RegistrationState regState = convertOpTypeToRegState(registration.getOpType());
|
| + listener.informRegistrationStatus(this, objectId, regState);
|
| + }
|
| + }
|
| + }
|
| +
|
| + /** Handles incoming {@code invalidations}. */
|
| + private void handleInvalidations(Collection<InvalidationP> invalidations) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| +
|
| + for (InvalidationP invalidation : invalidations) {
|
| + AckHandle ackHandle = AckHandle.newInstance(AckHandleP.create(invalidation).toByteArray());
|
| + if (ackCache.isAcked(invalidation)) {
|
| + // If the ack cache indicates that the client has already acked a restarted invalidation
|
| + // with an equal or greater version, then the TICL can simply acknowledge it immediately
|
| + // rather than delivering it to the listener.
|
| + logger.info("Stale invalidation {0}, not delivering", invalidation);
|
| + acknowledge(ackHandle);
|
| + statistics.recordReceivedMessage(ReceivedMessageType.STALE_INVALIDATION);
|
| + } else if (CommonProtos.isAllObjectId(invalidation.getObjectId())) {
|
| + logger.info("Issuing invalidate all");
|
| + listener.invalidateAll(InvalidationClientCore.this, ackHandle);
|
| + } else {
|
| + // Regular object. Could be unknown version or not.
|
| + Invalidation inv = ProtoWrapperConverter.convertFromInvalidationProto(invalidation);
|
| +
|
| + boolean isSuppressed = invalidation.getIsTrickleRestart();
|
| + logger.info("Issuing invalidate (known-version = %s, is-trickle-restart = %s): %s",
|
| + invalidation.getIsKnownVersion(), isSuppressed, inv);
|
| +
|
| + // Issue invalidate if the invalidation had a known version AND either no suppression has
|
| + // occurred or the client allows suppression.
|
| + if (invalidation.getIsKnownVersion() &&
|
| + (!isSuppressed || InvalidationClientCore.this.config.getAllowSuppression())) {
|
| + listener.invalidate(InvalidationClientCore.this, inv, ackHandle);
|
| + } else {
|
| + // Otherwise issue invalidateUnknownVersion.
|
| + listener.invalidateUnknownVersion(InvalidationClientCore.this, inv.getObjectId(),
|
| + ackHandle);
|
| + }
|
| + }
|
| + }
|
| + }
|
| +
|
| + /** Handles incoming registration statuses. */
|
| + private void handleRegistrationStatus(List<RegistrationStatus> regStatusList) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + List<Boolean> localProcessingStatuses =
|
| + registrationManager.handleRegistrationStatus(regStatusList);
|
| + Preconditions.checkState(localProcessingStatuses.size() == regStatusList.size(),
|
| + "Not all registration statuses were processed");
|
| +
|
| + // Inform app about the success or failure of each registration based
|
| + // on what the registration manager has indicated.
|
| + for (int i = 0; i < regStatusList.size(); ++i) {
|
| + RegistrationStatus regStatus = regStatusList.get(i);
|
| + boolean wasSuccess = localProcessingStatuses.get(i);
|
| + logger.fine("Process reg status: %s", regStatus);
|
| +
|
| + ObjectId objectId = ProtoWrapperConverter.convertFromObjectIdProto(
|
| + regStatus.getRegistration().getObjectId());
|
| + if (wasSuccess) {
|
| + // Server operation was both successful and agreed with what the client wanted.
|
| + int regOpType = regStatus.getRegistration().getOpType();
|
| + InvalidationListener.RegistrationState regState = convertOpTypeToRegState(regOpType);
|
| + listener.informRegistrationStatus(InvalidationClientCore.this, objectId, regState);
|
| + } else {
|
| + // Server operation either failed or disagreed with client's intent (e.g., successful
|
| + // unregister, but the client wanted a registration).
|
| + String description = CommonProtos.isSuccess(regStatus.getStatus())
|
| + ? "Registration discrepancy detected" : regStatus.getStatus().getDescription();
|
| + boolean isPermanent = CommonProtos.isPermanentFailure(regStatus.getStatus());
|
| + listener.informRegistrationFailure(InvalidationClientCore.this, objectId, !isPermanent,
|
| + description);
|
| + }
|
| + }
|
| + }
|
| +
|
| + /** Handles a registration sync request. */
|
| + private void handleRegistrationSyncRequest() {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + // Send all the registrations in the reg sync message.
|
| + // Generate a single subtree for all the registrations.
|
| + RegistrationSubtree subtree =
|
| + registrationManager.getRegistrations(Bytes.EMPTY_BYTES.getByteArray(), 0);
|
| + protocolHandler.sendRegistrationSyncSubtree(subtree, batchingTask);
|
| + }
|
| +
|
| + /** Handles an info message request. */
|
| + private void handleInfoMessage(Collection<Integer> infoTypes) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + boolean mustSendPerformanceCounters = false;
|
| + for (int infoType : infoTypes) {
|
| + mustSendPerformanceCounters = (infoType == InfoType.GET_PERFORMANCE_COUNTERS);
|
| + if (mustSendPerformanceCounters) {
|
| + break;
|
| + }
|
| + }
|
| + sendInfoMessageToServer(mustSendPerformanceCounters,
|
| + !registrationManager.isStateInSyncWithServer());
|
| + }
|
| +
|
| + /** Handles an error message. */
|
| + private void handleErrorMessage(ServerMessageHeader header, int code, String description) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| +
|
| + // If it is an auth failure, we shut down the ticl.
|
| + logger.severe("Received error message: %s, %s, %s", header, code, description);
|
| +
|
| + // Translate the code to error reason.
|
| + int reason;
|
| + switch (code) {
|
| + case ErrorMessage.Code.AUTH_FAILURE:
|
| + reason = ErrorInfo.ErrorReason.AUTH_FAILURE;
|
| + break;
|
| + case ErrorMessage.Code.UNKNOWN_FAILURE:
|
| + default:
|
| + reason = ErrorInfo.ErrorReason.UNKNOWN_FAILURE;
|
| + break;
|
| + }
|
| +
|
| + // Issue an informError to the application.
|
| + ErrorInfo errorInfo = ErrorInfo.newInstance(reason, false, description, null);
|
| + listener.informError(this, errorInfo);
|
| +
|
| + // If this is an auth failure, remove registrations and stop the Ticl. Otherwise do nothing.
|
| + if (code != ErrorMessage.Code.AUTH_FAILURE) {
|
| + return;
|
| + }
|
| +
|
| + // If there are any registrations, remove them and issue registration failure.
|
| + Collection<ObjectIdP> desiredRegistrations = registrationManager.removeRegisteredObjects();
|
| + logger.warning("Issuing failure for %s objects", desiredRegistrations.size());
|
| + for (ObjectIdP objectId : desiredRegistrations) {
|
| + listener.informRegistrationFailure(this,
|
| + ProtoWrapperConverter.convertFromObjectIdProto(objectId), false,
|
| + "Auth error: " + description);
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Returns whether the token in the header of {@code parsedMessage} matches either the
|
| + * client token or nonce of this Ticl (depending on which is non-{@code null}).
|
| + */
|
| + private boolean validateToken(ParsedMessage parsedMessage) {
|
| + if (clientToken != null) {
|
| + // Client token case.
|
| + if (!TypedUtil.<Bytes>equals(clientToken, parsedMessage.header.token)) {
|
| + logger.info("Incoming message has bad token: server = %s, client = %s",
|
| + parsedMessage.header.token, clientToken);
|
| + statistics.recordError(ClientErrorType.TOKEN_MISMATCH);
|
| + return false;
|
| + }
|
| + return true;
|
| + } else if (nonce != null) {
|
| + // Nonce case.
|
| + if (!TypedUtil.<Bytes>equals(nonce, parsedMessage.header.token)) {
|
| + statistics.recordError(ClientErrorType.NONCE_MISMATCH);
|
| + logger.info("Rejecting server message with mismatched nonce: Client = %s, Server = %s",
|
| + nonce, parsedMessage.header.token);
|
| + return false;
|
| + } else {
|
| + logger.info("Accepting server message with matching nonce: %s", nonce);
|
| + return true;
|
| + }
|
| + }
|
| + // Neither token nor nonce; ignore message.
|
| + logger.warning("Neither token nor nonce was set in validateToken: %s, %s", clientToken, nonce);
|
| + return false;
|
| + }
|
| +
|
| + /**
|
| + * Requests a new client identifier from the server.
|
| + * <p>
|
| + * REQUIRES: no token currently be held.
|
| + *
|
| + * @param debugString information to identify the caller
|
| + */
|
| + private void acquireToken(final String debugString) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| +
|
| + // Clear the current token and schedule the token acquisition.
|
| + setClientToken(null);
|
| + acquireTokenTask.ensureScheduled(debugString);
|
| + }
|
| +
|
| + /**
|
| + * Sends an info message to the server. If {@code mustSendPerformanceCounters} is true,
|
| + * the performance counters are sent regardless of when they were sent earlier.
|
| + */
|
| + private void sendInfoMessageToServer(boolean mustSendPerformanceCounters,
|
| + boolean requestServerSummary) {
|
| + logger.info("Sending info message to server; request server summary = %s",
|
| + requestServerSummary);
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| +
|
| + List<SimplePair<String, Integer>> performanceCounters =
|
| + new ArrayList<SimplePair<String, Integer>>();
|
| + ClientConfigP configToSend = null;
|
| + if (mustSendPerformanceCounters) {
|
| + statistics.getNonZeroStatistics(performanceCounters);
|
| + configToSend = config;
|
| + }
|
| + protocolHandler.sendInfoMessage(performanceCounters, configToSend, requestServerSummary,
|
| + batchingTask);
|
| + }
|
| +
|
| + /** Reads the Ticl state from persistent storage (if any) and calls {@code startInternal}. */
|
| + private void scheduleStartAfterReadingStateBlob() {
|
| + storage.readKey(CLIENT_TOKEN_KEY, new Callback<SimplePair<Status, byte[]>>() {
|
| + @Override
|
| + public void accept(final SimplePair<Status, byte[]> readResult) {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on internal thread");
|
| + final byte[] serializedState = readResult.getFirst().isSuccess() ?
|
| + readResult.getSecond() : null;
|
| + // Call start now.
|
| + if (!readResult.getFirst().isSuccess()) {
|
| + statistics.recordError(ClientErrorType.PERSISTENT_READ_FAILURE);
|
| + logger.warning("Could not read state blob: %s", readResult.getFirst().getMessage());
|
| + }
|
| + startInternal(serializedState);
|
| + }
|
| + });
|
| + }
|
| +
|
| + /**
|
| + * Converts an operation type {@code regOpType} to a
|
| + * {@code InvalidationListener.RegistrationState}.
|
| + */
|
| + private static InvalidationListener.RegistrationState convertOpTypeToRegState(int regOpType) {
|
| + InvalidationListener.RegistrationState regState =
|
| + regOpType == RegistrationP.OpType.REGISTER ?
|
| + InvalidationListener.RegistrationState.REGISTERED :
|
| + InvalidationListener.RegistrationState.UNREGISTERED;
|
| + return regState;
|
| + }
|
| +
|
| + /**
|
| + * Sets the nonce to {@code newNonce}.
|
| + * <p>
|
| + * REQUIRES: {@code newNonce} be null or {@link #clientToken} be null.
|
| + * The goal is to ensure that a nonce is never set unless there is no
|
| + * client token, unless the nonce is being cleared.
|
| + */
|
| + private void setNonce(Bytes newNonce) {
|
| + if ((newNonce != null) && (clientToken != null)) {
|
| + throw new IllegalStateException("Tried to set nonce with existing token " + clientToken);
|
| + }
|
| + this.nonce = newNonce;
|
| + }
|
| +
|
| + /**
|
| + * Returns a randomly generated nonce. Visible for testing only.
|
| + */
|
| +
|
| + static Bytes generateNonce(Random random) {
|
| + // Generate 8 random bytes.
|
| + byte[] randomBytes = new byte[8];
|
| + random.nextBytes(randomBytes);
|
| + return new Bytes(randomBytes);
|
| + }
|
| +
|
| + /**
|
| + * Sets the clientToken to {@code newClientToken}.
|
| + * <p>
|
| + * REQUIRES: {@code newClientToken} be null or {@link #nonce} be null.
|
| + * The goal is to ensure that a token is never set unless there is no
|
| + * nonce, unless the token is being cleared.
|
| + */
|
| + private void setClientToken(Bytes newClientToken) {
|
| + if ((newClientToken != null) && (nonce != null)) {
|
| + throw new IllegalStateException("Tried to set token with existing nonce " + nonce);
|
| + }
|
| +
|
| + // If the ticl is in the process of being started and we are getting a new token (either from
|
| + // persistence or from the server, start the ticl and inform the application.
|
| + boolean finishStartingTicl = !ticlState.isStarted() &&
|
| + (clientToken == null) && (newClientToken != null);
|
| + this.clientToken = newClientToken;
|
| +
|
| + if (finishStartingTicl) {
|
| + finishStartingTiclAndInformListener();
|
| + }
|
| + }
|
| +
|
| + /** Start the ticl and inform the listener that it is ready. */
|
| + private void finishStartingTiclAndInformListener() {
|
| + Preconditions.checkState(!ticlState.isStarted());
|
| + ticlState.start();
|
| + listener.ready(this);
|
| +
|
| + // We are not currently persisting our registration digest, so regardless of whether or not
|
| + // we are restarting from persistent state, we need to query the application for all of
|
| + // its registrations.
|
| + listener.reissueRegistrations(InvalidationClientCore.this, RegistrationManager.EMPTY_PREFIX, 0);
|
| + logger.info("Ticl started: %s", this);
|
| + }
|
| +
|
| + /**
|
| + * Returns an exponential backoff generator with {@code initialDelayMs} and other state as
|
| + * given in {@code marshalledState}.
|
| + */
|
| + private TiclExponentialBackoffDelayGenerator createExpBackOffGenerator(int initialDelayMs,
|
| + ExponentialBackoffState marshalledState) {
|
| + if (marshalledState != null) {
|
| + return new TiclExponentialBackoffDelayGenerator(random, initialDelayMs,
|
| + config.getMaxExponentialBackoffFactor(), marshalledState);
|
| + } else {
|
| + return new TiclExponentialBackoffDelayGenerator(random, initialDelayMs,
|
| + config.getMaxExponentialBackoffFactor());
|
| + }
|
| + }
|
| +
|
| + /** Returns a map from recurring task name to the runnable for that recurring task. */
|
| + protected Map<String, Runnable> getRecurringTasks() {
|
| + final int numPersistentTasks = 6;
|
| + HashMap<String, Runnable> tasks = new HashMap<String, Runnable>(numPersistentTasks);
|
| + tasks.put(AcquireTokenTask.TASK_NAME, acquireTokenTask.getRunnable());
|
| + tasks.put(RegSyncHeartbeatTask.TASK_NAME, regSyncHeartbeatTask.getRunnable());
|
| + tasks.put(PersistentWriteTask.TASK_NAME, persistentWriteTask.getRunnable());
|
| + tasks.put(HeartbeatTask.TASK_NAME, heartbeatTask.getRunnable());
|
| + tasks.put(BatchingTask.TASK_NAME, batchingTask.getRunnable());
|
| + tasks.put(InitialPersistentHeartbeatTask.TASK_NAME,
|
| + initialPersistentHeartbeatTask.getRunnable());
|
| + return tasks;
|
| + }
|
| +
|
| + @Override
|
| + public void toCompactString(TextBuilder builder) {
|
| + builder.append("Client: ").append(applicationClientId).append(", ")
|
| + .append(clientToken).append(", ").append(ticlState);
|
| + }
|
| +
|
| + @Override
|
| + public InvalidationClientState marshal() {
|
| + Preconditions.checkState(internalScheduler.isRunningOnThread(),
|
| + "Not running on internal thread");
|
| + InvalidationClientState.Builder builder = new InvalidationClientState.Builder();
|
| + builder.runState = ticlState.marshal();
|
| + builder.clientToken = clientToken;
|
| + builder.nonce = nonce;
|
| + builder.shouldSendRegistrations = shouldSendRegistrations;
|
| + builder.lastMessageSendTimeMs = lastMessageSendTimeMs;
|
| + builder.isOnline = isOnline;
|
| + builder.protocolHandlerState = protocolHandler.marshal();
|
| + builder.registrationManagerState = registrationManager.marshal();
|
| + builder.acquireTokenTaskState = acquireTokenTask.marshal();
|
| + builder.regSyncHeartbeatTaskState = regSyncHeartbeatTask.marshal();
|
| + builder.persistentWriteTaskState = persistentWriteTask.marshal();
|
| + builder.heartbeatTaskState = heartbeatTask.marshal();
|
| + builder.batchingTaskState = batchingTask.marshal();
|
| + builder.lastWrittenState = persistentWriteTask.lastWrittenState.get();
|
| + builder.statisticsState = statistics.marshal();
|
| + return builder.build();
|
| + }
|
| +}
|
|
|