| OLD | NEW |
| (Empty) | |
| 1 /* |
| 2 * Copyright 2011 Google Inc. |
| 3 * |
| 4 * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 * you may not use this file except in compliance with the License. |
| 6 * You may obtain a copy of the License at |
| 7 * |
| 8 * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 * |
| 10 * Unless required by applicable law or agreed to in writing, software |
| 11 * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 * See the License for the specific language governing permissions and |
| 14 * limitations under the License. |
| 15 */ |
| 16 |
| 17 package com.google.ipc.invalidation.ticl; |
| 18 |
| 19 import com.google.ipc.invalidation.common.DigestFunction; |
| 20 import com.google.ipc.invalidation.external.client.SystemResources.Logger; |
| 21 import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType; |
| 22 import com.google.ipc.invalidation.ticl.TestableInvalidationClient.RegistrationM
anagerState; |
| 23 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP; |
| 24 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP; |
| 25 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpTyp
e; |
| 26 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatus; |
| 27 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree
; |
| 28 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary
; |
| 29 import com.google.ipc.invalidation.ticl.proto.CommonProtos; |
| 30 import com.google.ipc.invalidation.ticl.proto.JavaClient.RegistrationManagerStat
eP; |
| 31 import com.google.ipc.invalidation.util.Bytes; |
| 32 import com.google.ipc.invalidation.util.InternalBase; |
| 33 import com.google.ipc.invalidation.util.Marshallable; |
| 34 import com.google.ipc.invalidation.util.TextBuilder; |
| 35 import com.google.ipc.invalidation.util.TypedUtil; |
| 36 |
| 37 import java.util.ArrayList; |
| 38 import java.util.Collection; |
| 39 import java.util.Collections; |
| 40 import java.util.HashMap; |
| 41 import java.util.HashSet; |
| 42 import java.util.List; |
| 43 import java.util.Map; |
| 44 import java.util.Set; |
| 45 |
| 46 |
| 47 /** |
| 48 * Object to track desired client registrations. This class belongs to caller (e
.g., |
| 49 * InvalidationClientImpl) and is not thread-safe - the caller has to use this c
lass in a |
| 50 * thread-safe manner. |
| 51 * |
| 52 */ |
| 53 class RegistrationManager extends InternalBase implements Marshallable<Registrat
ionManagerStateP> { |
| 54 |
| 55 /** Prefix used to request all registrations. */ |
| 56 static final byte[] EMPTY_PREFIX = new byte[]{}; |
| 57 |
| 58 /** The set of regisrations that the application has requested for. */ |
| 59 private DigestStore<ObjectIdP> desiredRegistrations; |
| 60 |
| 61 /** Statistics objects to track number of sent messages, etc. */ |
| 62 private final Statistics statistics; |
| 63 |
| 64 /** Latest known server registration state summary. */ |
| 65 private RegistrationSummary lastKnownServerSummary; |
| 66 |
| 67 /** |
| 68 * Map of object ids and operation types for which we have not yet issued any
registration-status |
| 69 * upcall to the listener. We need this so that we can synthesize success upca
lls if registration |
| 70 * sync, rather than a server message, communicates to us that we have a succe
ssful |
| 71 * (un)registration. |
| 72 * <p> |
| 73 * This is a map from object id to type, rather than a set of {@code Registrat
ionP}, because |
| 74 * a set of {@code RegistrationP} would assume that we always get a response f
or every operation |
| 75 * we issue, which isn't necessarily true (i.e., the server might send back an
unregistration |
| 76 * status in response to a registration request). |
| 77 */ |
| 78 private final Map<ObjectIdP, Integer> pendingOperations = new HashMap<ObjectId
P, Integer>(); |
| 79 |
| 80 private final Logger logger; |
| 81 |
| 82 public RegistrationManager(Logger logger, Statistics statistics, DigestFunctio
n digestFn, |
| 83 RegistrationManagerStateP registrationManagerState) { |
| 84 this.logger = logger; |
| 85 this.statistics = statistics; |
| 86 this.desiredRegistrations = new SimpleRegistrationStore(digestFn); |
| 87 |
| 88 if (registrationManagerState == null) { |
| 89 // Initialize the server summary with a 0 size and the digest correspondin
g |
| 90 // to it. Using defaultInstance would wrong since the server digest will |
| 91 // not match unnecessarily and result in an info message being sent. |
| 92 this.lastKnownServerSummary = getRegistrationSummary(); |
| 93 } else { |
| 94 this.lastKnownServerSummary = registrationManagerState.getNullableLastKnow
nServerSummary(); |
| 95 if (this.lastKnownServerSummary == null) { |
| 96 // If no server summary is set, use a default with size 0. |
| 97 this.lastKnownServerSummary = getRegistrationSummary(); |
| 98 } |
| 99 desiredRegistrations.add(registrationManagerState.getRegistrations()); |
| 100 for (RegistrationP regOp : registrationManagerState.getPendingOperations()
) { |
| 101 pendingOperations.put(regOp.getObjectId(), regOp.getOpType()); |
| 102 } |
| 103 } |
| 104 } |
| 105 |
| 106 /** |
| 107 * Returns a copy of the registration manager's state |
| 108 * <p> |
| 109 * Direct test code MUST not call this method on a random thread. It must be c
alled on the |
| 110 * InvalidationClientImpl's internal thread. |
| 111 */ |
| 112 |
| 113 RegistrationManagerState getRegistrationManagerStateCopyForTest() { |
| 114 List<ObjectIdP> registeredObjects = new ArrayList<ObjectIdP>(); |
| 115 registeredObjects.addAll(desiredRegistrations.getElements(EMPTY_PREFIX, 0)); |
| 116 return new RegistrationManagerState(getRegistrationSummary(), lastKnownServe
rSummary, |
| 117 registeredObjects); |
| 118 } |
| 119 |
| 120 /** |
| 121 * Sets the digest store to be {@code digestStore} for testing purposes. |
| 122 * <p> |
| 123 * REQUIRES: This method is called before the Ticl has done any operations on
this object. |
| 124 */ |
| 125 |
| 126 void setDigestStoreForTest(DigestStore<ObjectIdP> digestStore) { |
| 127 this.desiredRegistrations = digestStore; |
| 128 this.lastKnownServerSummary = getRegistrationSummary(); |
| 129 } |
| 130 |
| 131 /** Perform registration/unregistation for all objects in {@code objectIds}. *
/ |
| 132 Collection<ObjectIdP> performOperations(Collection<ObjectIdP> objectIds, int r
egOpType) { |
| 133 // Record that we have pending operations on the objects. |
| 134 for (ObjectIdP objectId : objectIds) { |
| 135 pendingOperations.put(objectId, regOpType); |
| 136 } |
| 137 // Update the digest appropriately. |
| 138 if (regOpType == RegistrationP.OpType.REGISTER) { |
| 139 return desiredRegistrations.add(objectIds); |
| 140 } else { |
| 141 return desiredRegistrations.remove(objectIds); |
| 142 } |
| 143 } |
| 144 |
| 145 /** |
| 146 * Returns a registration subtree for registrations where the digest of the ob
ject id begins with |
| 147 * the prefix {@code digestPrefix} of {@code prefixLen} bits. This method may
also return objects |
| 148 * whose digest prefix does not match {@code digestPrefix}. |
| 149 */ |
| 150 RegistrationSubtree getRegistrations(byte[] digestPrefix, int prefixLen) { |
| 151 return RegistrationSubtree.create(desiredRegistrations.getElements(digestPre
fix, prefixLen)); |
| 152 } |
| 153 |
| 154 /** |
| 155 * Handles registration operation statuses from the server. Returns a list of
booleans, one per |
| 156 * registration status, that indicates whether the registration operation was
both successful and |
| 157 * agreed with the desired client state (i.e., for each registration status, |
| 158 * (status.optype == register) == desiredRegistrations.contains(status.objecti
d)). |
| 159 * <p> |
| 160 * REQUIRES: the caller subsequently make an informRegistrationStatus or infor
mRegistrationFailure |
| 161 * upcall on the listener for each registration in {@code registrationStatuses
}. |
| 162 */ |
| 163 List<Boolean> handleRegistrationStatus(List<RegistrationStatus> registrationSt
atuses) { |
| 164 // Local-processing result code for each element of registrationStatuses. |
| 165 List<Boolean> localStatuses = new ArrayList<Boolean>(registrationStatuses.si
ze()); |
| 166 for (RegistrationStatus registrationStatus : registrationStatuses) { |
| 167 ObjectIdP objectIdProto = registrationStatus.getRegistration().getObjectId
(); |
| 168 |
| 169 // The object is no longer pending, since we have received a server status
for it, so |
| 170 // remove it from the pendingOperations map. (It may or may not have exist
ed in the map, |
| 171 // since we can receive spontaneous status messages from the server.) |
| 172 TypedUtil.remove(pendingOperations, objectIdProto); |
| 173 |
| 174 // We start off with the local-processing set as success, then potentially
fail. |
| 175 boolean isSuccess = true; |
| 176 |
| 177 // if the server operation succeeded, then local processing fails on "inco
mpatibility" as |
| 178 // defined above. |
| 179 if (CommonProtos.isSuccess(registrationStatus.getStatus())) { |
| 180 boolean appWantsRegistration = desiredRegistrations.contains(objectIdPro
to); |
| 181 boolean isOpRegistration = |
| 182 registrationStatus.getRegistration().getOpType() == RegistrationP.Op
Type.REGISTER; |
| 183 boolean discrepancyExists = isOpRegistration ^ appWantsRegistration; |
| 184 if (discrepancyExists) { |
| 185 // Remove the registration and set isSuccess to false, which will caus
e the caller to |
| 186 // issue registration-failure to the application. |
| 187 desiredRegistrations.remove(objectIdProto); |
| 188 statistics.recordError(ClientErrorType.REGISTRATION_DISCREPANCY); |
| 189 logger.info("Ticl discrepancy detected: registered = %s, requested = %
s. " + |
| 190 "Removing %s from requested", |
| 191 isOpRegistration, appWantsRegistration, objectIdProto); |
| 192 isSuccess = false; |
| 193 } |
| 194 } else { |
| 195 // If the server operation failed, then also local processing fails. |
| 196 desiredRegistrations.remove(objectIdProto); |
| 197 logger.fine("Removing %s from committed", objectIdProto); |
| 198 isSuccess = false; |
| 199 } |
| 200 localStatuses.add(isSuccess); |
| 201 } |
| 202 return localStatuses; |
| 203 } |
| 204 |
| 205 /** |
| 206 * Removes all desired registrations and pending operations. Returns all objec
t ids |
| 207 * that were affected. |
| 208 * <p> |
| 209 * REQUIRES: the caller issue a permanent failure upcall to the listener for a
ll returned object |
| 210 * ids. |
| 211 */ |
| 212 Collection<ObjectIdP> removeRegisteredObjects() { |
| 213 int numObjects = desiredRegistrations.size() + pendingOperations.size(); |
| 214 Set<ObjectIdP> failureCalls = new HashSet<ObjectIdP>(numObjects); |
| 215 failureCalls.addAll(desiredRegistrations.removeAll()); |
| 216 failureCalls.addAll(pendingOperations.keySet()); |
| 217 pendingOperations.clear(); |
| 218 return failureCalls; |
| 219 } |
| 220 |
| 221 // |
| 222 // Digest-related methods |
| 223 // |
| 224 |
| 225 /** Returns a summary of the desired registrations. */ |
| 226 RegistrationSummary getRegistrationSummary() { |
| 227 return RegistrationSummary.create(desiredRegistrations.size(), |
| 228 new Bytes(desiredRegistrations.getDigest())); |
| 229 } |
| 230 |
| 231 /** |
| 232 * Informs the manager of a new registration state summary from the server. |
| 233 * Returns a possibly-empty map of <object-id, reg-op-type>. For each entry in
the map, |
| 234 * the caller should make an inform-registration-status upcall on the listener
. |
| 235 */ |
| 236 Set<RegistrationP> informServerRegistrationSummary( |
| 237 RegistrationSummary regSummary) { |
| 238 if (regSummary != null) { |
| 239 this.lastKnownServerSummary = regSummary; |
| 240 } |
| 241 if (isStateInSyncWithServer()) { |
| 242 // If we are now in sync with the server, then the caller should make info
rm-reg-status |
| 243 // upcalls for all operations that we had pending, if any; they are also n
o longer pending. |
| 244 Set<RegistrationP> upcallsToMake = new HashSet<RegistrationP>(pendingOpera
tions.size()); |
| 245 for (Map.Entry<ObjectIdP, Integer> entry : pendingOperations.entrySet()) { |
| 246 ObjectIdP objectId = entry.getKey(); |
| 247 boolean isReg = entry.getValue() == OpType.REGISTER; |
| 248 upcallsToMake.add(CommonProtos.newRegistrationP(objectId, isReg)); |
| 249 } |
| 250 pendingOperations.clear(); |
| 251 return upcallsToMake; |
| 252 } else { |
| 253 // If we are not in sync with the server, then the caller should make no u
pcalls. |
| 254 return Collections.emptySet(); |
| 255 } |
| 256 } |
| 257 |
| 258 /** |
| 259 * Returns whether the local registration state and server state agree, based
on the last |
| 260 * received server summary (from {@link #informServerRegistrationSummary}). |
| 261 */ |
| 262 boolean isStateInSyncWithServer() { |
| 263 return TypedUtil.<RegistrationSummary>equals(lastKnownServerSummary, getRegi
strationSummary()); |
| 264 } |
| 265 |
| 266 @Override |
| 267 public void toCompactString(TextBuilder builder) { |
| 268 builder.appendFormat("Last known digest: %s, Requested regs: %s", lastKnownS
erverSummary, |
| 269 desiredRegistrations); |
| 270 } |
| 271 |
| 272 @Override |
| 273 public RegistrationManagerStateP marshal() { |
| 274 List<ObjectIdP> desiredRegistrations = |
| 275 new ArrayList<ObjectIdP>(this.desiredRegistrations.getElements(EMPTY_PRE
FIX, 0)); |
| 276 List<RegistrationP> pendingOperations = |
| 277 new ArrayList<RegistrationP>(this.pendingOperations.size()); |
| 278 for (Map.Entry<ObjectIdP, Integer> entry : this.pendingOperations.entrySet()
) { |
| 279 pendingOperations.add(RegistrationP.create(entry.getKey(), entry.getValue(
))); |
| 280 } |
| 281 return RegistrationManagerStateP.create(desiredRegistrations, lastKnownServe
rSummary, |
| 282 pendingOperations); |
| 283 } |
| 284 } |
| OLD | NEW |