| OLD | NEW |
| (Empty) | |
| 1 /* |
| 2 * Copyright 2011 Google Inc. |
| 3 * |
| 4 * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 * you may not use this file except in compliance with the License. |
| 6 * You may obtain a copy of the License at |
| 7 * |
| 8 * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 * |
| 10 * Unless required by applicable law or agreed to in writing, software |
| 11 * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 * See the License for the specific language governing permissions and |
| 14 * limitations under the License. |
| 15 */ |
| 16 |
| 17 package com.google.ipc.invalidation.ticl; |
| 18 |
| 19 import com.google.ipc.invalidation.external.client.SystemResources; |
| 20 import com.google.ipc.invalidation.external.client.SystemResources.Logger; |
| 21 import com.google.ipc.invalidation.external.client.SystemResources.NetworkChanne
l; |
| 22 import com.google.ipc.invalidation.external.client.SystemResources.Scheduler; |
| 23 import com.google.ipc.invalidation.external.client.types.SimplePair; |
| 24 import com.google.ipc.invalidation.ticl.InvalidationClientCore.BatchingTask; |
| 25 import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType; |
| 26 import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType; |
| 27 import com.google.ipc.invalidation.ticl.Statistics.SentMessageType; |
| 28 import com.google.ipc.invalidation.ticl.proto.ClientConstants; |
| 29 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientId
P; |
| 30 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP; |
| 31 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientHeader; |
| 32 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientToServerMessa
ge; |
| 33 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientVersion; |
| 34 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ConfigChangeMessage
; |
| 35 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage; |
| 36 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoMessage; |
| 37 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage; |
| 38 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage; |
| 39 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage.D
igestSerializationType; |
| 40 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationMessage
; |
| 41 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP; |
| 42 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP; |
| 43 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.PropertyRecord; |
| 44 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConf
igP; |
| 45 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RateLimitP; |
| 46 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationMessage
; |
| 47 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP; |
| 48 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpTyp
e; |
| 49 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatusM
essage; |
| 50 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree
; |
| 51 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary
; |
| 52 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncMes
sage; |
| 53 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncReq
uestMessage; |
| 54 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerHeader; |
| 55 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerToClientMessa
ge; |
| 56 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.TokenControlMessage
; |
| 57 import com.google.ipc.invalidation.ticl.proto.CommonProtos; |
| 58 import com.google.ipc.invalidation.ticl.proto.JavaClient.BatcherState; |
| 59 import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState; |
| 60 import com.google.ipc.invalidation.util.Bytes; |
| 61 import com.google.ipc.invalidation.util.InternalBase; |
| 62 import com.google.ipc.invalidation.util.Marshallable; |
| 63 import com.google.ipc.invalidation.util.Preconditions; |
| 64 import com.google.ipc.invalidation.util.ProtoWrapper; |
| 65 import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException; |
| 66 import com.google.ipc.invalidation.util.Smearer; |
| 67 import com.google.ipc.invalidation.util.TextBuilder; |
| 68 |
| 69 import java.util.ArrayList; |
| 70 import java.util.Collection; |
| 71 import java.util.HashMap; |
| 72 import java.util.HashSet; |
| 73 import java.util.List; |
| 74 import java.util.Map; |
| 75 import java.util.Set; |
| 76 |
| 77 |
| 78 /** |
| 79 * A layer for interacting with low-level protocol messages. Parses messages fr
om the server and |
| 80 * calls appropriate functions on the {@code ProtocolListener} to handle various
types of message |
| 81 * content. Also buffers message data from the client and constructs and sends
messages to the |
| 82 * server. |
| 83 * <p> |
| 84 * This class implements {@link Marshallable}, so its state can be written to a
protocol buffer, |
| 85 * and instances can be restored from such protocol buffers. Additionally, the n
ested class |
| 86 * {@link Batcher} also implements {@code Marshallable} for the same reason. |
| 87 * <p> |
| 88 * Note that while we talk about "marshalling," in this context we mean marshall
ing to protocol |
| 89 * buffers, not raw bytes. |
| 90 * |
| 91 */ |
| 92 class ProtocolHandler implements Marshallable<ProtocolHandlerState> { |
| 93 /** Class that batches messages to the server. */ |
| 94 private static class Batcher implements Marshallable<BatcherState> { |
| 95 /** Statistics to be updated when messages are created. */ |
| 96 private final Statistics statistics; |
| 97 |
| 98 /** Resources used for logging and thread assertions. */ |
| 99 private final SystemResources resources; |
| 100 |
| 101 /** Set of pending registrations stored as a map for overriding later operat
ions. */ |
| 102 private final Map<ObjectIdP, Integer> pendingRegistrations = new HashMap<Obj
ectIdP, Integer>(); |
| 103 |
| 104 /** Set of pending invalidation acks. */ |
| 105 private final Set<InvalidationP> pendingAckedInvalidations = new HashSet<Inv
alidationP>(); |
| 106 |
| 107 /** Set of pending registration sub trees for registration sync. */ |
| 108 private final Set<RegistrationSubtree> pendingRegSubtrees = new HashSet<Regi
strationSubtree>(); |
| 109 |
| 110 /** Pending initialization message to send to the server, if any. */ |
| 111 private InitializeMessage pendingInitializeMessage = null; |
| 112 |
| 113 /** Pending info message to send to the server, if any. */ |
| 114 private InfoMessage pendingInfoMessage = null; |
| 115 |
| 116 /** Creates a batcher. */ |
| 117 Batcher(SystemResources resources, Statistics statistics) { |
| 118 this.resources = resources; |
| 119 this.statistics = statistics; |
| 120 } |
| 121 |
| 122 /** Creates a batcher from {@code marshalledState}. */ |
| 123 Batcher(SystemResources resources, Statistics statistics, BatcherState marsh
alledState) { |
| 124 this(resources, statistics); |
| 125 for (ObjectIdP registration : marshalledState.getRegistration()) { |
| 126 pendingRegistrations.put(registration, RegistrationP.OpType.REGISTER); |
| 127 } |
| 128 for (ObjectIdP unregistration : marshalledState.getUnregistration()) { |
| 129 pendingRegistrations.put(unregistration, RegistrationP.OpType.UNREGISTER
); |
| 130 } |
| 131 for (InvalidationP ack : marshalledState.getAcknowledgement()) { |
| 132 pendingAckedInvalidations.add(ack); |
| 133 } |
| 134 for (RegistrationSubtree subtree : marshalledState.getRegistrationSubtree(
)) { |
| 135 pendingRegSubtrees.add(subtree); |
| 136 } |
| 137 pendingInitializeMessage = marshalledState.getNullableInitializeMessage(); |
| 138 if (marshalledState.hasInfoMessage()) { |
| 139 pendingInfoMessage = marshalledState.getInfoMessage(); |
| 140 } |
| 141 } |
| 142 |
| 143 /** Sets the initialize message to be sent. */ |
| 144 void setInitializeMessage(InitializeMessage msg) { |
| 145 pendingInitializeMessage = msg; |
| 146 } |
| 147 |
| 148 /** Sets the info message to be sent. */ |
| 149 void setInfoMessage(InfoMessage msg) { |
| 150 pendingInfoMessage = msg; |
| 151 } |
| 152 |
| 153 /** Adds a registration on {@code oid} of {@code opType} to the registration
s to be sent. */ |
| 154 void addRegistration(ObjectIdP oid, Integer opType) { |
| 155 pendingRegistrations.put(oid, opType); |
| 156 } |
| 157 |
| 158 /** Adds {@code ack} to the set of acknowledgements to be sent. */ |
| 159 void addAck(InvalidationP ack) { |
| 160 pendingAckedInvalidations.add(ack); |
| 161 } |
| 162 |
| 163 /** Adds {@code subtree} to the set of registration subtrees to be sent. */ |
| 164 void addRegSubtree(RegistrationSubtree subtree) { |
| 165 pendingRegSubtrees.add(subtree); |
| 166 } |
| 167 |
| 168 /** |
| 169 * Returns a builder for a {@link ClientToServerMessage} to be sent to the s
erver. Crucially, |
| 170 * the builder does <b>NOT</b> include the message header. |
| 171 * @param hasClientToken whether the client currently holds a token |
| 172 */ |
| 173 ClientToServerMessage toMessage(final ClientHeader header, boolean hasClient
Token) { |
| 174 final InitializeMessage initializeMessage; |
| 175 final RegistrationMessage registrationMessage; |
| 176 final RegistrationSyncMessage registrationSyncMessage; |
| 177 final InvalidationMessage invalidationAckMessage; |
| 178 final InfoMessage infoMessage; |
| 179 |
| 180 if (pendingInitializeMessage != null) { |
| 181 statistics.recordSentMessage(SentMessageType.INITIALIZE); |
| 182 initializeMessage = pendingInitializeMessage; |
| 183 pendingInitializeMessage = null; |
| 184 } else { |
| 185 initializeMessage = null; |
| 186 } |
| 187 |
| 188 // Note: Even if an initialize message is being sent, we can send addition
al |
| 189 // messages such as regisration messages, etc to the server. But if there
is no token |
| 190 // and an initialize message is not being sent, we cannot send any other m
essage. |
| 191 |
| 192 if (!hasClientToken && (initializeMessage == null)) { |
| 193 // Cannot send any message |
| 194 resources.getLogger().warning( |
| 195 "Cannot send message since no token and no initialize msg"); |
| 196 statistics.recordError(ClientErrorType.TOKEN_MISSING_FAILURE); |
| 197 return null; |
| 198 } |
| 199 |
| 200 // Check for pending batched operations and add to message builder if need
ed. |
| 201 |
| 202 // Add reg, acks, reg subtrees - clear them after adding. |
| 203 if (!pendingAckedInvalidations.isEmpty()) { |
| 204 invalidationAckMessage = createInvalidationAckMessage(); |
| 205 statistics.recordSentMessage(SentMessageType.INVALIDATION_ACK); |
| 206 } else { |
| 207 invalidationAckMessage = null; |
| 208 } |
| 209 |
| 210 // Check regs. |
| 211 if (!pendingRegistrations.isEmpty()) { |
| 212 registrationMessage = createRegistrationMessage(); |
| 213 statistics.recordSentMessage(SentMessageType.REGISTRATION); |
| 214 } else { |
| 215 registrationMessage = null; |
| 216 } |
| 217 |
| 218 // Check reg substrees. |
| 219 if (!pendingRegSubtrees.isEmpty()) { |
| 220 // If there are multiple pending reg subtrees, only one is sent. |
| 221 ArrayList<RegistrationSubtree> regSubtrees = new ArrayList<RegistrationS
ubtree>(1); |
| 222 regSubtrees.add(pendingRegSubtrees.iterator().next()); |
| 223 registrationSyncMessage = RegistrationSyncMessage.create(regSubtrees); |
| 224 pendingRegSubtrees.clear(); |
| 225 statistics.recordSentMessage(SentMessageType.REGISTRATION_SYNC); |
| 226 } else { |
| 227 registrationSyncMessage = null; |
| 228 } |
| 229 |
| 230 // Check if an info message has to be sent. |
| 231 if (pendingInfoMessage != null) { |
| 232 statistics.recordSentMessage(SentMessageType.INFO); |
| 233 infoMessage = pendingInfoMessage; |
| 234 pendingInfoMessage = null; |
| 235 } else { |
| 236 infoMessage = null; |
| 237 } |
| 238 |
| 239 return ClientToServerMessage.create(header, initializeMessage, registratio
nMessage, |
| 240 registrationSyncMessage, invalidationAckMessage, infoMessage); |
| 241 } |
| 242 |
| 243 /** |
| 244 * Creates a registration message based on registrations from {@code pending
Registrations} |
| 245 * and returns it. |
| 246 * <p> |
| 247 * REQUIRES: pendingRegistrations.size() > 0 |
| 248 */ |
| 249 private RegistrationMessage createRegistrationMessage() { |
| 250 Preconditions.checkState(!pendingRegistrations.isEmpty()); |
| 251 |
| 252 // Run through the pendingRegistrations map. |
| 253 List<RegistrationP> pendingRegistrations = |
| 254 new ArrayList<RegistrationP>(this.pendingRegistrations.size()); |
| 255 for (Map.Entry<ObjectIdP, Integer> entry : this.pendingRegistrations.entry
Set()) { |
| 256 pendingRegistrations.add(RegistrationP.create(entry.getKey(), entry.getV
alue())); |
| 257 } |
| 258 this.pendingRegistrations.clear(); |
| 259 return RegistrationMessage.create(pendingRegistrations); |
| 260 } |
| 261 |
| 262 /** |
| 263 * Creates an invalidation ack message based on acks from {@code pendingAcke
dInvalidations} and |
| 264 * returns it. |
| 265 * <p> |
| 266 * REQUIRES: pendingAckedInvalidations.size() > 0 |
| 267 */ |
| 268 private InvalidationMessage createInvalidationAckMessage() { |
| 269 Preconditions.checkState(!pendingAckedInvalidations.isEmpty()); |
| 270 InvalidationMessage ackMessage = |
| 271 InvalidationMessage.create(new ArrayList<InvalidationP>(pendingAckedIn
validations)); |
| 272 pendingAckedInvalidations.clear(); |
| 273 return ackMessage; |
| 274 } |
| 275 |
| 276 @Override |
| 277 public BatcherState marshal() { |
| 278 // Marshall (un)registrations. |
| 279 ArrayList<ObjectIdP> registrations = new ArrayList<ObjectIdP>(pendingRegis
trations.size()); |
| 280 ArrayList<ObjectIdP> unregistrations = new ArrayList<ObjectIdP>(pendingReg
istrations.size()); |
| 281 for (Map.Entry<ObjectIdP, Integer> entry : pendingRegistrations.entrySet()
) { |
| 282 Integer opType = entry.getValue(); |
| 283 ObjectIdP oid = entry.getKey(); |
| 284 new ArrayList<ObjectIdP>(pendingRegistrations.size()); |
| 285 switch (opType) { |
| 286 case OpType.REGISTER: |
| 287 registrations.add(oid); |
| 288 break; |
| 289 case OpType.UNREGISTER: |
| 290 unregistrations.add(oid); |
| 291 break; |
| 292 default: |
| 293 throw new IllegalArgumentException(opType.toString()); |
| 294 } |
| 295 } |
| 296 return BatcherState.create(registrations, unregistrations, pendingAckedInv
alidations, |
| 297 pendingRegSubtrees, pendingInitializeMessage, pendingInfoMessage); |
| 298 } |
| 299 } |
| 300 |
| 301 /** Representation of a message header for use in a server message. */ |
| 302 static class ServerMessageHeader extends InternalBase { |
| 303 /** |
| 304 * Constructs an instance. |
| 305 * |
| 306 * @param token server-sent token |
| 307 * @param registrationSummary summary over server registration state |
| 308 */ |
| 309 ServerMessageHeader(Bytes token, RegistrationSummary registrationSummary) { |
| 310 this.token = token; |
| 311 this.registrationSummary = registrationSummary; |
| 312 } |
| 313 |
| 314 /** Server-sent token. */ |
| 315 Bytes token; |
| 316 |
| 317 /** Summary of the client's registration state at the server. */ |
| 318 RegistrationSummary registrationSummary; |
| 319 |
| 320 @Override |
| 321 public void toCompactString(TextBuilder builder) { |
| 322 builder.appendFormat("Token: %s, Summary: %s", token, registrationSummary)
; |
| 323 } |
| 324 } |
| 325 |
| 326 /** |
| 327 * Representation of a message receiver for the server. Such a message is guar
anteed to be |
| 328 * valid, but the session token is <b>not</b> checked. |
| 329 */ |
| 330 static class ParsedMessage { |
| 331 /* |
| 332 * Each of these fields corresponds directly to a field in the ServerToClien
tMessage protobuf. |
| 333 * It is non-null iff the corresponding hasYYY method in the protobuf would
return true. |
| 334 */ |
| 335 final ServerMessageHeader header; |
| 336 final TokenControlMessage tokenControlMessage; |
| 337 final InvalidationMessage invalidationMessage; |
| 338 final RegistrationStatusMessage registrationStatusMessage; |
| 339 final RegistrationSyncRequestMessage registrationSyncRequestMessage; |
| 340 final ConfigChangeMessage configChangeMessage; |
| 341 final InfoRequestMessage infoRequestMessage; |
| 342 final ErrorMessage errorMessage; |
| 343 |
| 344 /** Constructs an instance from a {@code rawMessage}. */ |
| 345 ParsedMessage(ServerToClientMessage rawMessage) { |
| 346 // For each field, assign it to the corresponding protobuf field if presen
t, else null. |
| 347 ServerHeader messageHeader = rawMessage.getHeader(); |
| 348 header = new ServerMessageHeader(messageHeader.getClientToken(), |
| 349 messageHeader.getNullableRegistrationSummary()); |
| 350 tokenControlMessage = |
| 351 rawMessage.hasTokenControlMessage() ? rawMessage.getTokenControlMessag
e() : null; |
| 352 invalidationMessage = rawMessage.getNullableInvalidationMessage(); |
| 353 registrationStatusMessage = rawMessage.getNullableRegistrationStatusMessag
e(); |
| 354 registrationSyncRequestMessage = rawMessage.hasRegistrationSyncRequestMess
age() |
| 355 ? rawMessage.getRegistrationSyncRequestMessage() : null; |
| 356 configChangeMessage = |
| 357 rawMessage.hasConfigChangeMessage() ? rawMessage.getConfigChangeMessag
e() : null; |
| 358 infoRequestMessage = rawMessage.getNullableInfoRequestMessage(); |
| 359 errorMessage = rawMessage.getNullableErrorMessage(); |
| 360 } |
| 361 } |
| 362 |
| 363 /** |
| 364 * Listener for protocol events. The handler guarantees that the call will be
made on the internal |
| 365 * thread that the SystemResources provides. |
| 366 */ |
| 367 interface ProtocolListener { |
| 368 /** Records that a message was sent to the server at the current time. */ |
| 369 void handleMessageSent(); |
| 370 |
| 371 /** Returns a summary of the current desired registrations. */ |
| 372 RegistrationSummary getRegistrationSummary(); |
| 373 |
| 374 /** Returns the current server-assigned client token, if any. */ |
| 375 Bytes getClientToken(); |
| 376 } |
| 377 |
| 378 /** Information about the client, e.g., application name, OS, etc. */ |
| 379 private final ClientVersion clientVersion; |
| 380 |
| 381 /** A logger. */ |
| 382 private final Logger logger; |
| 383 |
| 384 /** Scheduler for the client's internal processing. */ |
| 385 private final Scheduler internalScheduler; |
| 386 |
| 387 /** Network channel for sending and receiving messages to and from the server.
*/ |
| 388 private final NetworkChannel network; |
| 389 |
| 390 /** The protocol listener. */ |
| 391 private final ProtocolListener listener; |
| 392 |
| 393 /** Batches messages to the server. */ |
| 394 private final Batcher batcher; |
| 395 |
| 396 /** A debug message id that is added to every message to the server. */ |
| 397 private int messageId = 1; |
| 398 |
| 399 // State specific to a client. If we want to support multiple clients, this co
uld |
| 400 // be in a map or could be eliminated (e.g., no batching). |
| 401 |
| 402 /** The last known time from the server. */ |
| 403 private long lastKnownServerTimeMs = 0; |
| 404 |
| 405 /** |
| 406 * The next time before which a message cannot be sent to the server. If this
is less than current |
| 407 * time, a message can be sent at any time. |
| 408 */ |
| 409 private long nextMessageSendTimeMs = 0; |
| 410 |
| 411 /** Statistics objects to track number of sent messages, etc. */ |
| 412 private final Statistics statistics; |
| 413 |
| 414 /** Client type for inclusion in headers. */ |
| 415 private final int clientType; |
| 416 |
| 417 /** |
| 418 * Creates an instance. |
| 419 * |
| 420 * @param config configuration for the client |
| 421 * @param resources resources to use |
| 422 * @param smearer a smearer to randomize delays |
| 423 * @param statistics track information about messages sent/received, etc |
| 424 * @param applicationName name of the application using the library (for debug
ging/monitoring) |
| 425 * @param listener callback for protocol events |
| 426 */ |
| 427 ProtocolHandler(ProtocolHandlerConfigP config, final SystemResources resources
, |
| 428 Smearer smearer, Statistics statistics, int clientType, String application
Name, |
| 429 ProtocolListener listener, ProtocolHandlerState marshalledState) { |
| 430 this.logger = resources.getLogger(); |
| 431 this.statistics = statistics; |
| 432 this.internalScheduler = resources.getInternalScheduler(); |
| 433 this.network = resources.getNetwork(); |
| 434 this.listener = listener; |
| 435 this.clientVersion = CommonProtos.newClientVersion(resources.getPlatform(),
"Java", |
| 436 applicationName); |
| 437 this.clientType = clientType; |
| 438 if (marshalledState == null) { |
| 439 // If there is no marshalled state, construct a clean batcher. |
| 440 this.batcher = new Batcher(resources, statistics); |
| 441 } else { |
| 442 // Otherwise, restore the batcher from the marshalled state. |
| 443 this.batcher = new Batcher(resources, statistics, marshalledState.getBatch
erState()); |
| 444 this.messageId = marshalledState.getMessageId(); |
| 445 this.lastKnownServerTimeMs = marshalledState.getLastKnownServerTimeMs(); |
| 446 this.nextMessageSendTimeMs = marshalledState.getNextMessageSendTimeMs(); |
| 447 } |
| 448 logger.info("Created protocol handler for application %s, platform %s", appl
icationName, |
| 449 resources.getPlatform()); |
| 450 } |
| 451 |
| 452 /** Returns a default config for the protocol handler. */ |
| 453 static ProtocolHandlerConfigP createConfig() { |
| 454 // Allow at most 3 messages every 5 seconds. |
| 455 int windowMs = 5 * 1000; |
| 456 int numMessagesPerWindow = 3; |
| 457 |
| 458 List<RateLimitP> rateLimits = new ArrayList<RateLimitP>(); |
| 459 rateLimits.add(RateLimitP.create(windowMs, numMessagesPerWindow)); |
| 460 return ProtocolHandlerConfigP.create(null, rateLimits); |
| 461 } |
| 462 |
| 463 /** Returns a configuration object with parameters set for unit tests. */ |
| 464 static ProtocolHandlerConfigP createConfigForTest() { |
| 465 // No rate limits |
| 466 int smallBatchDelayForTest = 200; |
| 467 return ProtocolHandlerConfigP.create(smallBatchDelayForTest, new ArrayList<R
ateLimitP>(0)); |
| 468 } |
| 469 |
| 470 /** |
| 471 * Returns the next time a message is allowed to be sent to the server. Typic
ally, this will be |
| 472 * in the past, meaning that the client is free to send a message at any time. |
| 473 */ |
| 474 public long getNextMessageSendTimeMsForTest() { |
| 475 return nextMessageSendTimeMs; |
| 476 } |
| 477 |
| 478 /** |
| 479 * Handles a message from the server. If the message can be processed (i.e., i
s valid, is |
| 480 * of the right version, and is not a silence message), returns a {@link Parse
dMessage} |
| 481 * representing it. Otherwise, returns {@code null}. |
| 482 * <p> |
| 483 * This class intercepts and processes silence messages. In this case, it will
discard any other |
| 484 * data in the message. |
| 485 * <p> |
| 486 * Note that this method does <b>not</b> check the session token of any messag
e. |
| 487 */ |
| 488 ParsedMessage handleIncomingMessage(byte[] incomingMessage) { |
| 489 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 490 ServerToClientMessage message; |
| 491 try { |
| 492 message = ServerToClientMessage.parseFrom(incomingMessage); |
| 493 } catch (ValidationException exception) { |
| 494 statistics.recordError(ClientErrorType.INCOMING_MESSAGE_FAILURE); |
| 495 logger.warning("Incoming message is invalid: %s", Bytes.toLazyCompactStrin
g(incomingMessage)); |
| 496 return null; |
| 497 } |
| 498 |
| 499 // Check the version of the message. |
| 500 if (message.getHeader().getProtocolVersion().getVersion().getMajorVersion()
!= |
| 501 ClientConstants.PROTOCOL_MAJOR_VERSION) { |
| 502 statistics.recordError(ClientErrorType.PROTOCOL_VERSION_FAILURE); |
| 503 logger.severe("Dropping message with incompatible version: %s", message); |
| 504 return null; |
| 505 } |
| 506 |
| 507 // Check if it is a ConfigChangeMessage which indicates that messages should
no longer be |
| 508 // sent for a certain duration. Perform this check before the token is even
checked. |
| 509 if (message.hasConfigChangeMessage()) { |
| 510 ConfigChangeMessage configChangeMsg = message.getConfigChangeMessage(); |
| 511 statistics.recordReceivedMessage(ReceivedMessageType.CONFIG_CHANGE); |
| 512 if (configChangeMsg.hasNextMessageDelayMs()) { // Validator has ensured t
hat it is positive. |
| 513 nextMessageSendTimeMs = |
| 514 internalScheduler.getCurrentTimeMs() + configChangeMsg.getNextMessag
eDelayMs(); |
| 515 } |
| 516 return null; // Ignore all other messages in the envelope. |
| 517 } |
| 518 |
| 519 lastKnownServerTimeMs = Math.max(lastKnownServerTimeMs, message.getHeader().
getServerTimeMs()); |
| 520 return new ParsedMessage(message); |
| 521 } |
| 522 |
| 523 /** |
| 524 * Sends a message to the server to request a client token. |
| 525 * |
| 526 * @param applicationClientId application-specific client id |
| 527 * @param nonce nonce for the request |
| 528 * @param debugString information to identify the caller |
| 529 */ |
| 530 void sendInitializeMessage(ApplicationClientIdP applicationClientId, Bytes non
ce, |
| 531 BatchingTask batchingTask, String debugString) { |
| 532 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 533 if (applicationClientId.getClientType() != clientType) { |
| 534 // This condition is not fatal, but it probably represents a bug somewhere
if it occurs. |
| 535 logger.warning( |
| 536 "Client type in application id does not match constructor-provided typ
e: %s vs %s", |
| 537 applicationClientId, clientType); |
| 538 } |
| 539 |
| 540 // Simply store the message in pendingInitializeMessage and send it when the
batching task runs. |
| 541 InitializeMessage initializeMsg = InitializeMessage.create(clientType, nonce
, |
| 542 applicationClientId, DigestSerializationType.BYTE_BASED); |
| 543 batcher.setInitializeMessage(initializeMsg); |
| 544 logger.info("Batching initialize message for client: %s, %s", debugString, i
nitializeMsg); |
| 545 batchingTask.ensureScheduled(debugString); |
| 546 } |
| 547 |
| 548 /** |
| 549 * Sends an info message to the server with the performance counters supplied |
| 550 * in {@code performanceCounters} and the config supplies in |
| 551 * {@code configParams}. |
| 552 * |
| 553 * @param requestServerRegistrationSummary indicates whether to request the |
| 554 * server's registration summary |
| 555 */ |
| 556 void sendInfoMessage(List<SimplePair<String, Integer>> performanceCounters, |
| 557 ClientConfigP clientConfig, boolean requestServerRegistrationSummary, |
| 558 BatchingTask batchingTask) { |
| 559 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 560 |
| 561 List<PropertyRecord> performanceCounterRecords = |
| 562 new ArrayList<PropertyRecord>(performanceCounters.size()); |
| 563 for (SimplePair<String, Integer> counter : performanceCounters) { |
| 564 performanceCounterRecords.add(PropertyRecord.create(counter.first, counter
.second)); |
| 565 } |
| 566 InfoMessage infoMessage = InfoMessage.create(clientVersion, /* configParamet
er */ null, |
| 567 performanceCounterRecords, requestServerRegistrationSummary, clientConfi
g); |
| 568 |
| 569 // Simply store the message in pendingInfoMessage and send it when the batch
ing task runs. |
| 570 batcher.setInfoMessage(infoMessage); |
| 571 batchingTask.ensureScheduled("Send-info"); |
| 572 } |
| 573 |
| 574 /** |
| 575 * Sends a registration request to the server. |
| 576 * |
| 577 * @param objectIds object ids on which to (un)register |
| 578 * @param regOpType whether to register or unregister |
| 579 */ |
| 580 void sendRegistrations(Collection<ObjectIdP> objectIds, Integer regOpType, |
| 581 BatchingTask batchingTask) { |
| 582 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 583 for (ObjectIdP objectId : objectIds) { |
| 584 batcher.addRegistration(objectId, regOpType); |
| 585 } |
| 586 batchingTask.ensureScheduled("Send-registrations"); |
| 587 } |
| 588 |
| 589 /** Sends an acknowledgement for {@code invalidation} to the server. */ |
| 590 void sendInvalidationAck(InvalidationP invalidation, BatchingTask batchingTask
) { |
| 591 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 592 // We could summarize acks when there are suppressing invalidations - we don
't since it is |
| 593 // unlikely to be too beneficial here. |
| 594 logger.fine("Sending ack for invalidation %s", invalidation); |
| 595 batcher.addAck(invalidation); |
| 596 batchingTask.ensureScheduled("Send-Ack"); |
| 597 } |
| 598 |
| 599 /** |
| 600 * Sends a single registration subtree to the server. |
| 601 * |
| 602 * @param regSubtree subtree to send |
| 603 */ |
| 604 void sendRegistrationSyncSubtree(RegistrationSubtree regSubtree, BatchingTask
batchingTask) { |
| 605 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 606 batcher.addRegSubtree(regSubtree); |
| 607 logger.info("Adding subtree: %s", regSubtree); |
| 608 batchingTask.ensureScheduled("Send-reg-sync"); |
| 609 } |
| 610 |
| 611 /** Sends pending data to the server (e.g., registrations, acks, registration
sync messages). */ |
| 612 void sendMessageToServer() { |
| 613 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 614 if (nextMessageSendTimeMs > internalScheduler.getCurrentTimeMs()) { |
| 615 logger.warning("In quiet period: not sending message to server: %s > %s", |
| 616 nextMessageSendTimeMs, internalScheduler.getCurrentTimeMs()); |
| 617 return; |
| 618 } |
| 619 |
| 620 // Create the message from the batcher. |
| 621 ClientToServerMessage message; |
| 622 try { |
| 623 message = batcher.toMessage(createClientHeader(), listener.getClientToken(
) != null); |
| 624 if (message == null) { |
| 625 // Happens when we don't have a token and are not sending an initialize
message. Logged |
| 626 // in batcher.toMessage(). |
| 627 return; |
| 628 } |
| 629 } catch (ProtoWrapper.ValidationArgumentException exception) { |
| 630 logger.severe("Tried to send invalid message: %s", batcher); |
| 631 statistics.recordError(ClientErrorType.OUTGOING_MESSAGE_FAILURE); |
| 632 return; |
| 633 } |
| 634 ++messageId; |
| 635 |
| 636 statistics.recordSentMessage(SentMessageType.TOTAL); |
| 637 logger.fine("Sending message to server: %s", message); |
| 638 network.sendMessage(message.toByteArray()); |
| 639 |
| 640 // Record that the message was sent. We're invoking the listener directly, r
ather than |
| 641 // scheduling a new work unit to do it. It would be safer to do a schedule,
but that's hard to |
| 642 // do in Android, we wrote this listener (it's InvalidationClientCore, so we
know what it does), |
| 643 // and it's the last line of this function. |
| 644 listener.handleMessageSent(); |
| 645 } |
| 646 |
| 647 /** Returns the header to include on a message to the server. */ |
| 648 private ClientHeader createClientHeader() { |
| 649 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte
rnal thread"); |
| 650 return ClientHeader.create(ClientConstants.PROTOCOL_VERSION, |
| 651 listener.getClientToken(), listener.getRegistrationSummary(), |
| 652 internalScheduler.getCurrentTimeMs(), lastKnownServerTimeMs, Integer.to
String(messageId), |
| 653 clientType); |
| 654 } |
| 655 |
| 656 @Override |
| 657 public ProtocolHandlerState marshal() { |
| 658 return ProtocolHandlerState.create(messageId, lastKnownServerTimeMs, nextMes
sageSendTimeMs, |
| 659 batcher.marshal()); |
| 660 } |
| 661 } |
| OLD | NEW |