| Index: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/RegistrationManager.java
|
| diff --git a/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/RegistrationManager.java b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/RegistrationManager.java
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..e985295d401888cfe191653e3e9d8fc38e780725
|
| --- /dev/null
|
| +++ b/third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/RegistrationManager.java
|
| @@ -0,0 +1,284 @@
|
| +/*
|
| + * Copyright 2011 Google Inc.
|
| + *
|
| + * Licensed under the Apache License, Version 2.0 (the "License");
|
| + * you may not use this file except in compliance with the License.
|
| + * You may obtain a copy of the License at
|
| + *
|
| + * http://www.apache.org/licenses/LICENSE-2.0
|
| + *
|
| + * Unless required by applicable law or agreed to in writing, software
|
| + * distributed under the License is distributed on an "AS IS" BASIS,
|
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| + * See the License for the specific language governing permissions and
|
| + * limitations under the License.
|
| + */
|
| +
|
| +package com.google.ipc.invalidation.ticl;
|
| +
|
| +import com.google.ipc.invalidation.common.DigestFunction;
|
| +import com.google.ipc.invalidation.external.client.SystemResources.Logger;
|
| +import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
|
| +import com.google.ipc.invalidation.ticl.TestableInvalidationClient.RegistrationManagerState;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpType;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatus;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree;
|
| +import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary;
|
| +import com.google.ipc.invalidation.ticl.proto.CommonProtos;
|
| +import com.google.ipc.invalidation.ticl.proto.JavaClient.RegistrationManagerStateP;
|
| +import com.google.ipc.invalidation.util.Bytes;
|
| +import com.google.ipc.invalidation.util.InternalBase;
|
| +import com.google.ipc.invalidation.util.Marshallable;
|
| +import com.google.ipc.invalidation.util.TextBuilder;
|
| +import com.google.ipc.invalidation.util.TypedUtil;
|
| +
|
| +import java.util.ArrayList;
|
| +import java.util.Collection;
|
| +import java.util.Collections;
|
| +import java.util.HashMap;
|
| +import java.util.HashSet;
|
| +import java.util.List;
|
| +import java.util.Map;
|
| +import java.util.Set;
|
| +
|
| +
|
| +/**
|
| + * Object to track desired client registrations. This class belongs to caller (e.g.,
|
| + * InvalidationClientImpl) and is not thread-safe - the caller has to use this class in a
|
| + * thread-safe manner.
|
| + *
|
| + */
|
| +class RegistrationManager extends InternalBase implements Marshallable<RegistrationManagerStateP> {
|
| +
|
| + /** Prefix used to request all registrations. */
|
| + static final byte[] EMPTY_PREFIX = new byte[]{};
|
| +
|
| + /** The set of regisrations that the application has requested for. */
|
| + private DigestStore<ObjectIdP> desiredRegistrations;
|
| +
|
| + /** Statistics objects to track number of sent messages, etc. */
|
| + private final Statistics statistics;
|
| +
|
| + /** Latest known server registration state summary. */
|
| + private RegistrationSummary lastKnownServerSummary;
|
| +
|
| + /**
|
| + * Map of object ids and operation types for which we have not yet issued any registration-status
|
| + * upcall to the listener. We need this so that we can synthesize success upcalls if registration
|
| + * sync, rather than a server message, communicates to us that we have a successful
|
| + * (un)registration.
|
| + * <p>
|
| + * This is a map from object id to type, rather than a set of {@code RegistrationP}, because
|
| + * a set of {@code RegistrationP} would assume that we always get a response for every operation
|
| + * we issue, which isn't necessarily true (i.e., the server might send back an unregistration
|
| + * status in response to a registration request).
|
| + */
|
| + private final Map<ObjectIdP, Integer> pendingOperations = new HashMap<ObjectIdP, Integer>();
|
| +
|
| + private final Logger logger;
|
| +
|
| + public RegistrationManager(Logger logger, Statistics statistics, DigestFunction digestFn,
|
| + RegistrationManagerStateP registrationManagerState) {
|
| + this.logger = logger;
|
| + this.statistics = statistics;
|
| + this.desiredRegistrations = new SimpleRegistrationStore(digestFn);
|
| +
|
| + if (registrationManagerState == null) {
|
| + // Initialize the server summary with a 0 size and the digest corresponding
|
| + // to it. Using defaultInstance would wrong since the server digest will
|
| + // not match unnecessarily and result in an info message being sent.
|
| + this.lastKnownServerSummary = getRegistrationSummary();
|
| + } else {
|
| + this.lastKnownServerSummary = registrationManagerState.getNullableLastKnownServerSummary();
|
| + if (this.lastKnownServerSummary == null) {
|
| + // If no server summary is set, use a default with size 0.
|
| + this.lastKnownServerSummary = getRegistrationSummary();
|
| + }
|
| + desiredRegistrations.add(registrationManagerState.getRegistrations());
|
| + for (RegistrationP regOp : registrationManagerState.getPendingOperations()) {
|
| + pendingOperations.put(regOp.getObjectId(), regOp.getOpType());
|
| + }
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Returns a copy of the registration manager's state
|
| + * <p>
|
| + * Direct test code MUST not call this method on a random thread. It must be called on the
|
| + * InvalidationClientImpl's internal thread.
|
| + */
|
| +
|
| + RegistrationManagerState getRegistrationManagerStateCopyForTest() {
|
| + List<ObjectIdP> registeredObjects = new ArrayList<ObjectIdP>();
|
| + registeredObjects.addAll(desiredRegistrations.getElements(EMPTY_PREFIX, 0));
|
| + return new RegistrationManagerState(getRegistrationSummary(), lastKnownServerSummary,
|
| + registeredObjects);
|
| + }
|
| +
|
| + /**
|
| + * Sets the digest store to be {@code digestStore} for testing purposes.
|
| + * <p>
|
| + * REQUIRES: This method is called before the Ticl has done any operations on this object.
|
| + */
|
| +
|
| + void setDigestStoreForTest(DigestStore<ObjectIdP> digestStore) {
|
| + this.desiredRegistrations = digestStore;
|
| + this.lastKnownServerSummary = getRegistrationSummary();
|
| + }
|
| +
|
| + /** Perform registration/unregistation for all objects in {@code objectIds}. */
|
| + Collection<ObjectIdP> performOperations(Collection<ObjectIdP> objectIds, int regOpType) {
|
| + // Record that we have pending operations on the objects.
|
| + for (ObjectIdP objectId : objectIds) {
|
| + pendingOperations.put(objectId, regOpType);
|
| + }
|
| + // Update the digest appropriately.
|
| + if (regOpType == RegistrationP.OpType.REGISTER) {
|
| + return desiredRegistrations.add(objectIds);
|
| + } else {
|
| + return desiredRegistrations.remove(objectIds);
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Returns a registration subtree for registrations where the digest of the object id begins with
|
| + * the prefix {@code digestPrefix} of {@code prefixLen} bits. This method may also return objects
|
| + * whose digest prefix does not match {@code digestPrefix}.
|
| + */
|
| + RegistrationSubtree getRegistrations(byte[] digestPrefix, int prefixLen) {
|
| + return RegistrationSubtree.create(desiredRegistrations.getElements(digestPrefix, prefixLen));
|
| + }
|
| +
|
| + /**
|
| + * Handles registration operation statuses from the server. Returns a list of booleans, one per
|
| + * registration status, that indicates whether the registration operation was both successful and
|
| + * agreed with the desired client state (i.e., for each registration status,
|
| + * (status.optype == register) == desiredRegistrations.contains(status.objectid)).
|
| + * <p>
|
| + * REQUIRES: the caller subsequently make an informRegistrationStatus or informRegistrationFailure
|
| + * upcall on the listener for each registration in {@code registrationStatuses}.
|
| + */
|
| + List<Boolean> handleRegistrationStatus(List<RegistrationStatus> registrationStatuses) {
|
| + // Local-processing result code for each element of registrationStatuses.
|
| + List<Boolean> localStatuses = new ArrayList<Boolean>(registrationStatuses.size());
|
| + for (RegistrationStatus registrationStatus : registrationStatuses) {
|
| + ObjectIdP objectIdProto = registrationStatus.getRegistration().getObjectId();
|
| +
|
| + // The object is no longer pending, since we have received a server status for it, so
|
| + // remove it from the pendingOperations map. (It may or may not have existed in the map,
|
| + // since we can receive spontaneous status messages from the server.)
|
| + TypedUtil.remove(pendingOperations, objectIdProto);
|
| +
|
| + // We start off with the local-processing set as success, then potentially fail.
|
| + boolean isSuccess = true;
|
| +
|
| + // if the server operation succeeded, then local processing fails on "incompatibility" as
|
| + // defined above.
|
| + if (CommonProtos.isSuccess(registrationStatus.getStatus())) {
|
| + boolean appWantsRegistration = desiredRegistrations.contains(objectIdProto);
|
| + boolean isOpRegistration =
|
| + registrationStatus.getRegistration().getOpType() == RegistrationP.OpType.REGISTER;
|
| + boolean discrepancyExists = isOpRegistration ^ appWantsRegistration;
|
| + if (discrepancyExists) {
|
| + // Remove the registration and set isSuccess to false, which will cause the caller to
|
| + // issue registration-failure to the application.
|
| + desiredRegistrations.remove(objectIdProto);
|
| + statistics.recordError(ClientErrorType.REGISTRATION_DISCREPANCY);
|
| + logger.info("Ticl discrepancy detected: registered = %s, requested = %s. " +
|
| + "Removing %s from requested",
|
| + isOpRegistration, appWantsRegistration, objectIdProto);
|
| + isSuccess = false;
|
| + }
|
| + } else {
|
| + // If the server operation failed, then also local processing fails.
|
| + desiredRegistrations.remove(objectIdProto);
|
| + logger.fine("Removing %s from committed", objectIdProto);
|
| + isSuccess = false;
|
| + }
|
| + localStatuses.add(isSuccess);
|
| + }
|
| + return localStatuses;
|
| + }
|
| +
|
| + /**
|
| + * Removes all desired registrations and pending operations. Returns all object ids
|
| + * that were affected.
|
| + * <p>
|
| + * REQUIRES: the caller issue a permanent failure upcall to the listener for all returned object
|
| + * ids.
|
| + */
|
| + Collection<ObjectIdP> removeRegisteredObjects() {
|
| + int numObjects = desiredRegistrations.size() + pendingOperations.size();
|
| + Set<ObjectIdP> failureCalls = new HashSet<ObjectIdP>(numObjects);
|
| + failureCalls.addAll(desiredRegistrations.removeAll());
|
| + failureCalls.addAll(pendingOperations.keySet());
|
| + pendingOperations.clear();
|
| + return failureCalls;
|
| + }
|
| +
|
| + //
|
| + // Digest-related methods
|
| + //
|
| +
|
| + /** Returns a summary of the desired registrations. */
|
| + RegistrationSummary getRegistrationSummary() {
|
| + return RegistrationSummary.create(desiredRegistrations.size(),
|
| + new Bytes(desiredRegistrations.getDigest()));
|
| + }
|
| +
|
| + /**
|
| + * Informs the manager of a new registration state summary from the server.
|
| + * Returns a possibly-empty map of <object-id, reg-op-type>. For each entry in the map,
|
| + * the caller should make an inform-registration-status upcall on the listener.
|
| + */
|
| + Set<RegistrationP> informServerRegistrationSummary(
|
| + RegistrationSummary regSummary) {
|
| + if (regSummary != null) {
|
| + this.lastKnownServerSummary = regSummary;
|
| + }
|
| + if (isStateInSyncWithServer()) {
|
| + // If we are now in sync with the server, then the caller should make inform-reg-status
|
| + // upcalls for all operations that we had pending, if any; they are also no longer pending.
|
| + Set<RegistrationP> upcallsToMake = new HashSet<RegistrationP>(pendingOperations.size());
|
| + for (Map.Entry<ObjectIdP, Integer> entry : pendingOperations.entrySet()) {
|
| + ObjectIdP objectId = entry.getKey();
|
| + boolean isReg = entry.getValue() == OpType.REGISTER;
|
| + upcallsToMake.add(CommonProtos.newRegistrationP(objectId, isReg));
|
| + }
|
| + pendingOperations.clear();
|
| + return upcallsToMake;
|
| + } else {
|
| + // If we are not in sync with the server, then the caller should make no upcalls.
|
| + return Collections.emptySet();
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Returns whether the local registration state and server state agree, based on the last
|
| + * received server summary (from {@link #informServerRegistrationSummary}).
|
| + */
|
| + boolean isStateInSyncWithServer() {
|
| + return TypedUtil.<RegistrationSummary>equals(lastKnownServerSummary, getRegistrationSummary());
|
| + }
|
| +
|
| + @Override
|
| + public void toCompactString(TextBuilder builder) {
|
| + builder.appendFormat("Last known digest: %s, Requested regs: %s", lastKnownServerSummary,
|
| + desiredRegistrations);
|
| + }
|
| +
|
| + @Override
|
| + public RegistrationManagerStateP marshal() {
|
| + List<ObjectIdP> desiredRegistrations =
|
| + new ArrayList<ObjectIdP>(this.desiredRegistrations.getElements(EMPTY_PREFIX, 0));
|
| + List<RegistrationP> pendingOperations =
|
| + new ArrayList<RegistrationP>(this.pendingOperations.size());
|
| + for (Map.Entry<ObjectIdP, Integer> entry : this.pendingOperations.entrySet()) {
|
| + pendingOperations.add(RegistrationP.create(entry.getKey(), entry.getValue()));
|
| + }
|
| + return RegistrationManagerStateP.create(desiredRegistrations, lastKnownServerSummary,
|
| + pendingOperations);
|
| + }
|
| +}
|
|
|