Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(511)

Side by Side Diff: third_party/cacheinvalidation/src/java/com/google/ipc/invalidation/ticl/ProtocolHandler.java

Issue 1162033004: Pull cacheinvalidations code directory into chromium repo. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright 2011 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.ipc.invalidation.ticl;
18
19 import com.google.ipc.invalidation.external.client.SystemResources;
20 import com.google.ipc.invalidation.external.client.SystemResources.Logger;
21 import com.google.ipc.invalidation.external.client.SystemResources.NetworkChanne l;
22 import com.google.ipc.invalidation.external.client.SystemResources.Scheduler;
23 import com.google.ipc.invalidation.external.client.types.SimplePair;
24 import com.google.ipc.invalidation.ticl.InvalidationClientCore.BatchingTask;
25 import com.google.ipc.invalidation.ticl.Statistics.ClientErrorType;
26 import com.google.ipc.invalidation.ticl.Statistics.ReceivedMessageType;
27 import com.google.ipc.invalidation.ticl.Statistics.SentMessageType;
28 import com.google.ipc.invalidation.ticl.proto.ClientConstants;
29 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ApplicationClientId P;
30 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientConfigP;
31 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientHeader;
32 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientToServerMessa ge;
33 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ClientVersion;
34 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ConfigChangeMessage ;
35 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ErrorMessage;
36 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoMessage;
37 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InfoRequestMessage;
38 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage;
39 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InitializeMessage.D igestSerializationType;
40 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationMessage ;
41 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.InvalidationP;
42 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ObjectIdP;
43 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.PropertyRecord;
44 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ProtocolHandlerConf igP;
45 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RateLimitP;
46 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationMessage ;
47 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP;
48 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationP.OpTyp e;
49 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationStatusM essage;
50 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSubtree ;
51 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSummary ;
52 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncMes sage;
53 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.RegistrationSyncReq uestMessage;
54 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerHeader;
55 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.ServerToClientMessa ge;
56 import com.google.ipc.invalidation.ticl.proto.ClientProtocol.TokenControlMessage ;
57 import com.google.ipc.invalidation.ticl.proto.CommonProtos;
58 import com.google.ipc.invalidation.ticl.proto.JavaClient.BatcherState;
59 import com.google.ipc.invalidation.ticl.proto.JavaClient.ProtocolHandlerState;
60 import com.google.ipc.invalidation.util.Bytes;
61 import com.google.ipc.invalidation.util.InternalBase;
62 import com.google.ipc.invalidation.util.Marshallable;
63 import com.google.ipc.invalidation.util.Preconditions;
64 import com.google.ipc.invalidation.util.ProtoWrapper;
65 import com.google.ipc.invalidation.util.ProtoWrapper.ValidationException;
66 import com.google.ipc.invalidation.util.Smearer;
67 import com.google.ipc.invalidation.util.TextBuilder;
68
69 import java.util.ArrayList;
70 import java.util.Collection;
71 import java.util.HashMap;
72 import java.util.HashSet;
73 import java.util.List;
74 import java.util.Map;
75 import java.util.Set;
76
77
78 /**
79 * A layer for interacting with low-level protocol messages. Parses messages fr om the server and
80 * calls appropriate functions on the {@code ProtocolListener} to handle various types of message
81 * content. Also buffers message data from the client and constructs and sends messages to the
82 * server.
83 * <p>
84 * This class implements {@link Marshallable}, so its state can be written to a protocol buffer,
85 * and instances can be restored from such protocol buffers. Additionally, the n ested class
86 * {@link Batcher} also implements {@code Marshallable} for the same reason.
87 * <p>
88 * Note that while we talk about "marshalling," in this context we mean marshall ing to protocol
89 * buffers, not raw bytes.
90 *
91 */
92 class ProtocolHandler implements Marshallable<ProtocolHandlerState> {
93 /** Class that batches messages to the server. */
94 private static class Batcher implements Marshallable<BatcherState> {
95 /** Statistics to be updated when messages are created. */
96 private final Statistics statistics;
97
98 /** Resources used for logging and thread assertions. */
99 private final SystemResources resources;
100
101 /** Set of pending registrations stored as a map for overriding later operat ions. */
102 private final Map<ObjectIdP, Integer> pendingRegistrations = new HashMap<Obj ectIdP, Integer>();
103
104 /** Set of pending invalidation acks. */
105 private final Set<InvalidationP> pendingAckedInvalidations = new HashSet<Inv alidationP>();
106
107 /** Set of pending registration sub trees for registration sync. */
108 private final Set<RegistrationSubtree> pendingRegSubtrees = new HashSet<Regi strationSubtree>();
109
110 /** Pending initialization message to send to the server, if any. */
111 private InitializeMessage pendingInitializeMessage = null;
112
113 /** Pending info message to send to the server, if any. */
114 private InfoMessage pendingInfoMessage = null;
115
116 /** Creates a batcher. */
117 Batcher(SystemResources resources, Statistics statistics) {
118 this.resources = resources;
119 this.statistics = statistics;
120 }
121
122 /** Creates a batcher from {@code marshalledState}. */
123 Batcher(SystemResources resources, Statistics statistics, BatcherState marsh alledState) {
124 this(resources, statistics);
125 for (ObjectIdP registration : marshalledState.getRegistration()) {
126 pendingRegistrations.put(registration, RegistrationP.OpType.REGISTER);
127 }
128 for (ObjectIdP unregistration : marshalledState.getUnregistration()) {
129 pendingRegistrations.put(unregistration, RegistrationP.OpType.UNREGISTER );
130 }
131 for (InvalidationP ack : marshalledState.getAcknowledgement()) {
132 pendingAckedInvalidations.add(ack);
133 }
134 for (RegistrationSubtree subtree : marshalledState.getRegistrationSubtree( )) {
135 pendingRegSubtrees.add(subtree);
136 }
137 pendingInitializeMessage = marshalledState.getNullableInitializeMessage();
138 if (marshalledState.hasInfoMessage()) {
139 pendingInfoMessage = marshalledState.getInfoMessage();
140 }
141 }
142
143 /** Sets the initialize message to be sent. */
144 void setInitializeMessage(InitializeMessage msg) {
145 pendingInitializeMessage = msg;
146 }
147
148 /** Sets the info message to be sent. */
149 void setInfoMessage(InfoMessage msg) {
150 pendingInfoMessage = msg;
151 }
152
153 /** Adds a registration on {@code oid} of {@code opType} to the registration s to be sent. */
154 void addRegistration(ObjectIdP oid, Integer opType) {
155 pendingRegistrations.put(oid, opType);
156 }
157
158 /** Adds {@code ack} to the set of acknowledgements to be sent. */
159 void addAck(InvalidationP ack) {
160 pendingAckedInvalidations.add(ack);
161 }
162
163 /** Adds {@code subtree} to the set of registration subtrees to be sent. */
164 void addRegSubtree(RegistrationSubtree subtree) {
165 pendingRegSubtrees.add(subtree);
166 }
167
168 /**
169 * Returns a builder for a {@link ClientToServerMessage} to be sent to the s erver. Crucially,
170 * the builder does <b>NOT</b> include the message header.
171 * @param hasClientToken whether the client currently holds a token
172 */
173 ClientToServerMessage toMessage(final ClientHeader header, boolean hasClient Token) {
174 final InitializeMessage initializeMessage;
175 final RegistrationMessage registrationMessage;
176 final RegistrationSyncMessage registrationSyncMessage;
177 final InvalidationMessage invalidationAckMessage;
178 final InfoMessage infoMessage;
179
180 if (pendingInitializeMessage != null) {
181 statistics.recordSentMessage(SentMessageType.INITIALIZE);
182 initializeMessage = pendingInitializeMessage;
183 pendingInitializeMessage = null;
184 } else {
185 initializeMessage = null;
186 }
187
188 // Note: Even if an initialize message is being sent, we can send addition al
189 // messages such as regisration messages, etc to the server. But if there is no token
190 // and an initialize message is not being sent, we cannot send any other m essage.
191
192 if (!hasClientToken && (initializeMessage == null)) {
193 // Cannot send any message
194 resources.getLogger().warning(
195 "Cannot send message since no token and no initialize msg");
196 statistics.recordError(ClientErrorType.TOKEN_MISSING_FAILURE);
197 return null;
198 }
199
200 // Check for pending batched operations and add to message builder if need ed.
201
202 // Add reg, acks, reg subtrees - clear them after adding.
203 if (!pendingAckedInvalidations.isEmpty()) {
204 invalidationAckMessage = createInvalidationAckMessage();
205 statistics.recordSentMessage(SentMessageType.INVALIDATION_ACK);
206 } else {
207 invalidationAckMessage = null;
208 }
209
210 // Check regs.
211 if (!pendingRegistrations.isEmpty()) {
212 registrationMessage = createRegistrationMessage();
213 statistics.recordSentMessage(SentMessageType.REGISTRATION);
214 } else {
215 registrationMessage = null;
216 }
217
218 // Check reg substrees.
219 if (!pendingRegSubtrees.isEmpty()) {
220 // If there are multiple pending reg subtrees, only one is sent.
221 ArrayList<RegistrationSubtree> regSubtrees = new ArrayList<RegistrationS ubtree>(1);
222 regSubtrees.add(pendingRegSubtrees.iterator().next());
223 registrationSyncMessage = RegistrationSyncMessage.create(regSubtrees);
224 pendingRegSubtrees.clear();
225 statistics.recordSentMessage(SentMessageType.REGISTRATION_SYNC);
226 } else {
227 registrationSyncMessage = null;
228 }
229
230 // Check if an info message has to be sent.
231 if (pendingInfoMessage != null) {
232 statistics.recordSentMessage(SentMessageType.INFO);
233 infoMessage = pendingInfoMessage;
234 pendingInfoMessage = null;
235 } else {
236 infoMessage = null;
237 }
238
239 return ClientToServerMessage.create(header, initializeMessage, registratio nMessage,
240 registrationSyncMessage, invalidationAckMessage, infoMessage);
241 }
242
243 /**
244 * Creates a registration message based on registrations from {@code pending Registrations}
245 * and returns it.
246 * <p>
247 * REQUIRES: pendingRegistrations.size() > 0
248 */
249 private RegistrationMessage createRegistrationMessage() {
250 Preconditions.checkState(!pendingRegistrations.isEmpty());
251
252 // Run through the pendingRegistrations map.
253 List<RegistrationP> pendingRegistrations =
254 new ArrayList<RegistrationP>(this.pendingRegistrations.size());
255 for (Map.Entry<ObjectIdP, Integer> entry : this.pendingRegistrations.entry Set()) {
256 pendingRegistrations.add(RegistrationP.create(entry.getKey(), entry.getV alue()));
257 }
258 this.pendingRegistrations.clear();
259 return RegistrationMessage.create(pendingRegistrations);
260 }
261
262 /**
263 * Creates an invalidation ack message based on acks from {@code pendingAcke dInvalidations} and
264 * returns it.
265 * <p>
266 * REQUIRES: pendingAckedInvalidations.size() > 0
267 */
268 private InvalidationMessage createInvalidationAckMessage() {
269 Preconditions.checkState(!pendingAckedInvalidations.isEmpty());
270 InvalidationMessage ackMessage =
271 InvalidationMessage.create(new ArrayList<InvalidationP>(pendingAckedIn validations));
272 pendingAckedInvalidations.clear();
273 return ackMessage;
274 }
275
276 @Override
277 public BatcherState marshal() {
278 // Marshall (un)registrations.
279 ArrayList<ObjectIdP> registrations = new ArrayList<ObjectIdP>(pendingRegis trations.size());
280 ArrayList<ObjectIdP> unregistrations = new ArrayList<ObjectIdP>(pendingReg istrations.size());
281 for (Map.Entry<ObjectIdP, Integer> entry : pendingRegistrations.entrySet() ) {
282 Integer opType = entry.getValue();
283 ObjectIdP oid = entry.getKey();
284 new ArrayList<ObjectIdP>(pendingRegistrations.size());
285 switch (opType) {
286 case OpType.REGISTER:
287 registrations.add(oid);
288 break;
289 case OpType.UNREGISTER:
290 unregistrations.add(oid);
291 break;
292 default:
293 throw new IllegalArgumentException(opType.toString());
294 }
295 }
296 return BatcherState.create(registrations, unregistrations, pendingAckedInv alidations,
297 pendingRegSubtrees, pendingInitializeMessage, pendingInfoMessage);
298 }
299 }
300
301 /** Representation of a message header for use in a server message. */
302 static class ServerMessageHeader extends InternalBase {
303 /**
304 * Constructs an instance.
305 *
306 * @param token server-sent token
307 * @param registrationSummary summary over server registration state
308 */
309 ServerMessageHeader(Bytes token, RegistrationSummary registrationSummary) {
310 this.token = token;
311 this.registrationSummary = registrationSummary;
312 }
313
314 /** Server-sent token. */
315 Bytes token;
316
317 /** Summary of the client's registration state at the server. */
318 RegistrationSummary registrationSummary;
319
320 @Override
321 public void toCompactString(TextBuilder builder) {
322 builder.appendFormat("Token: %s, Summary: %s", token, registrationSummary) ;
323 }
324 }
325
326 /**
327 * Representation of a message receiver for the server. Such a message is guar anteed to be
328 * valid, but the session token is <b>not</b> checked.
329 */
330 static class ParsedMessage {
331 /*
332 * Each of these fields corresponds directly to a field in the ServerToClien tMessage protobuf.
333 * It is non-null iff the corresponding hasYYY method in the protobuf would return true.
334 */
335 final ServerMessageHeader header;
336 final TokenControlMessage tokenControlMessage;
337 final InvalidationMessage invalidationMessage;
338 final RegistrationStatusMessage registrationStatusMessage;
339 final RegistrationSyncRequestMessage registrationSyncRequestMessage;
340 final ConfigChangeMessage configChangeMessage;
341 final InfoRequestMessage infoRequestMessage;
342 final ErrorMessage errorMessage;
343
344 /** Constructs an instance from a {@code rawMessage}. */
345 ParsedMessage(ServerToClientMessage rawMessage) {
346 // For each field, assign it to the corresponding protobuf field if presen t, else null.
347 ServerHeader messageHeader = rawMessage.getHeader();
348 header = new ServerMessageHeader(messageHeader.getClientToken(),
349 messageHeader.getNullableRegistrationSummary());
350 tokenControlMessage =
351 rawMessage.hasTokenControlMessage() ? rawMessage.getTokenControlMessag e() : null;
352 invalidationMessage = rawMessage.getNullableInvalidationMessage();
353 registrationStatusMessage = rawMessage.getNullableRegistrationStatusMessag e();
354 registrationSyncRequestMessage = rawMessage.hasRegistrationSyncRequestMess age()
355 ? rawMessage.getRegistrationSyncRequestMessage() : null;
356 configChangeMessage =
357 rawMessage.hasConfigChangeMessage() ? rawMessage.getConfigChangeMessag e() : null;
358 infoRequestMessage = rawMessage.getNullableInfoRequestMessage();
359 errorMessage = rawMessage.getNullableErrorMessage();
360 }
361 }
362
363 /**
364 * Listener for protocol events. The handler guarantees that the call will be made on the internal
365 * thread that the SystemResources provides.
366 */
367 interface ProtocolListener {
368 /** Records that a message was sent to the server at the current time. */
369 void handleMessageSent();
370
371 /** Returns a summary of the current desired registrations. */
372 RegistrationSummary getRegistrationSummary();
373
374 /** Returns the current server-assigned client token, if any. */
375 Bytes getClientToken();
376 }
377
378 /** Information about the client, e.g., application name, OS, etc. */
379 private final ClientVersion clientVersion;
380
381 /** A logger. */
382 private final Logger logger;
383
384 /** Scheduler for the client's internal processing. */
385 private final Scheduler internalScheduler;
386
387 /** Network channel for sending and receiving messages to and from the server. */
388 private final NetworkChannel network;
389
390 /** The protocol listener. */
391 private final ProtocolListener listener;
392
393 /** Batches messages to the server. */
394 private final Batcher batcher;
395
396 /** A debug message id that is added to every message to the server. */
397 private int messageId = 1;
398
399 // State specific to a client. If we want to support multiple clients, this co uld
400 // be in a map or could be eliminated (e.g., no batching).
401
402 /** The last known time from the server. */
403 private long lastKnownServerTimeMs = 0;
404
405 /**
406 * The next time before which a message cannot be sent to the server. If this is less than current
407 * time, a message can be sent at any time.
408 */
409 private long nextMessageSendTimeMs = 0;
410
411 /** Statistics objects to track number of sent messages, etc. */
412 private final Statistics statistics;
413
414 /** Client type for inclusion in headers. */
415 private final int clientType;
416
417 /**
418 * Creates an instance.
419 *
420 * @param config configuration for the client
421 * @param resources resources to use
422 * @param smearer a smearer to randomize delays
423 * @param statistics track information about messages sent/received, etc
424 * @param applicationName name of the application using the library (for debug ging/monitoring)
425 * @param listener callback for protocol events
426 */
427 ProtocolHandler(ProtocolHandlerConfigP config, final SystemResources resources ,
428 Smearer smearer, Statistics statistics, int clientType, String application Name,
429 ProtocolListener listener, ProtocolHandlerState marshalledState) {
430 this.logger = resources.getLogger();
431 this.statistics = statistics;
432 this.internalScheduler = resources.getInternalScheduler();
433 this.network = resources.getNetwork();
434 this.listener = listener;
435 this.clientVersion = CommonProtos.newClientVersion(resources.getPlatform(), "Java",
436 applicationName);
437 this.clientType = clientType;
438 if (marshalledState == null) {
439 // If there is no marshalled state, construct a clean batcher.
440 this.batcher = new Batcher(resources, statistics);
441 } else {
442 // Otherwise, restore the batcher from the marshalled state.
443 this.batcher = new Batcher(resources, statistics, marshalledState.getBatch erState());
444 this.messageId = marshalledState.getMessageId();
445 this.lastKnownServerTimeMs = marshalledState.getLastKnownServerTimeMs();
446 this.nextMessageSendTimeMs = marshalledState.getNextMessageSendTimeMs();
447 }
448 logger.info("Created protocol handler for application %s, platform %s", appl icationName,
449 resources.getPlatform());
450 }
451
452 /** Returns a default config for the protocol handler. */
453 static ProtocolHandlerConfigP createConfig() {
454 // Allow at most 3 messages every 5 seconds.
455 int windowMs = 5 * 1000;
456 int numMessagesPerWindow = 3;
457
458 List<RateLimitP> rateLimits = new ArrayList<RateLimitP>();
459 rateLimits.add(RateLimitP.create(windowMs, numMessagesPerWindow));
460 return ProtocolHandlerConfigP.create(null, rateLimits);
461 }
462
463 /** Returns a configuration object with parameters set for unit tests. */
464 static ProtocolHandlerConfigP createConfigForTest() {
465 // No rate limits
466 int smallBatchDelayForTest = 200;
467 return ProtocolHandlerConfigP.create(smallBatchDelayForTest, new ArrayList<R ateLimitP>(0));
468 }
469
470 /**
471 * Returns the next time a message is allowed to be sent to the server. Typic ally, this will be
472 * in the past, meaning that the client is free to send a message at any time.
473 */
474 public long getNextMessageSendTimeMsForTest() {
475 return nextMessageSendTimeMs;
476 }
477
478 /**
479 * Handles a message from the server. If the message can be processed (i.e., i s valid, is
480 * of the right version, and is not a silence message), returns a {@link Parse dMessage}
481 * representing it. Otherwise, returns {@code null}.
482 * <p>
483 * This class intercepts and processes silence messages. In this case, it will discard any other
484 * data in the message.
485 * <p>
486 * Note that this method does <b>not</b> check the session token of any messag e.
487 */
488 ParsedMessage handleIncomingMessage(byte[] incomingMessage) {
489 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
490 ServerToClientMessage message;
491 try {
492 message = ServerToClientMessage.parseFrom(incomingMessage);
493 } catch (ValidationException exception) {
494 statistics.recordError(ClientErrorType.INCOMING_MESSAGE_FAILURE);
495 logger.warning("Incoming message is invalid: %s", Bytes.toLazyCompactStrin g(incomingMessage));
496 return null;
497 }
498
499 // Check the version of the message.
500 if (message.getHeader().getProtocolVersion().getVersion().getMajorVersion() !=
501 ClientConstants.PROTOCOL_MAJOR_VERSION) {
502 statistics.recordError(ClientErrorType.PROTOCOL_VERSION_FAILURE);
503 logger.severe("Dropping message with incompatible version: %s", message);
504 return null;
505 }
506
507 // Check if it is a ConfigChangeMessage which indicates that messages should no longer be
508 // sent for a certain duration. Perform this check before the token is even checked.
509 if (message.hasConfigChangeMessage()) {
510 ConfigChangeMessage configChangeMsg = message.getConfigChangeMessage();
511 statistics.recordReceivedMessage(ReceivedMessageType.CONFIG_CHANGE);
512 if (configChangeMsg.hasNextMessageDelayMs()) { // Validator has ensured t hat it is positive.
513 nextMessageSendTimeMs =
514 internalScheduler.getCurrentTimeMs() + configChangeMsg.getNextMessag eDelayMs();
515 }
516 return null; // Ignore all other messages in the envelope.
517 }
518
519 lastKnownServerTimeMs = Math.max(lastKnownServerTimeMs, message.getHeader(). getServerTimeMs());
520 return new ParsedMessage(message);
521 }
522
523 /**
524 * Sends a message to the server to request a client token.
525 *
526 * @param applicationClientId application-specific client id
527 * @param nonce nonce for the request
528 * @param debugString information to identify the caller
529 */
530 void sendInitializeMessage(ApplicationClientIdP applicationClientId, Bytes non ce,
531 BatchingTask batchingTask, String debugString) {
532 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
533 if (applicationClientId.getClientType() != clientType) {
534 // This condition is not fatal, but it probably represents a bug somewhere if it occurs.
535 logger.warning(
536 "Client type in application id does not match constructor-provided typ e: %s vs %s",
537 applicationClientId, clientType);
538 }
539
540 // Simply store the message in pendingInitializeMessage and send it when the batching task runs.
541 InitializeMessage initializeMsg = InitializeMessage.create(clientType, nonce ,
542 applicationClientId, DigestSerializationType.BYTE_BASED);
543 batcher.setInitializeMessage(initializeMsg);
544 logger.info("Batching initialize message for client: %s, %s", debugString, i nitializeMsg);
545 batchingTask.ensureScheduled(debugString);
546 }
547
548 /**
549 * Sends an info message to the server with the performance counters supplied
550 * in {@code performanceCounters} and the config supplies in
551 * {@code configParams}.
552 *
553 * @param requestServerRegistrationSummary indicates whether to request the
554 * server's registration summary
555 */
556 void sendInfoMessage(List<SimplePair<String, Integer>> performanceCounters,
557 ClientConfigP clientConfig, boolean requestServerRegistrationSummary,
558 BatchingTask batchingTask) {
559 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
560
561 List<PropertyRecord> performanceCounterRecords =
562 new ArrayList<PropertyRecord>(performanceCounters.size());
563 for (SimplePair<String, Integer> counter : performanceCounters) {
564 performanceCounterRecords.add(PropertyRecord.create(counter.first, counter .second));
565 }
566 InfoMessage infoMessage = InfoMessage.create(clientVersion, /* configParamet er */ null,
567 performanceCounterRecords, requestServerRegistrationSummary, clientConfi g);
568
569 // Simply store the message in pendingInfoMessage and send it when the batch ing task runs.
570 batcher.setInfoMessage(infoMessage);
571 batchingTask.ensureScheduled("Send-info");
572 }
573
574 /**
575 * Sends a registration request to the server.
576 *
577 * @param objectIds object ids on which to (un)register
578 * @param regOpType whether to register or unregister
579 */
580 void sendRegistrations(Collection<ObjectIdP> objectIds, Integer regOpType,
581 BatchingTask batchingTask) {
582 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
583 for (ObjectIdP objectId : objectIds) {
584 batcher.addRegistration(objectId, regOpType);
585 }
586 batchingTask.ensureScheduled("Send-registrations");
587 }
588
589 /** Sends an acknowledgement for {@code invalidation} to the server. */
590 void sendInvalidationAck(InvalidationP invalidation, BatchingTask batchingTask ) {
591 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
592 // We could summarize acks when there are suppressing invalidations - we don 't since it is
593 // unlikely to be too beneficial here.
594 logger.fine("Sending ack for invalidation %s", invalidation);
595 batcher.addAck(invalidation);
596 batchingTask.ensureScheduled("Send-Ack");
597 }
598
599 /**
600 * Sends a single registration subtree to the server.
601 *
602 * @param regSubtree subtree to send
603 */
604 void sendRegistrationSyncSubtree(RegistrationSubtree regSubtree, BatchingTask batchingTask) {
605 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
606 batcher.addRegSubtree(regSubtree);
607 logger.info("Adding subtree: %s", regSubtree);
608 batchingTask.ensureScheduled("Send-reg-sync");
609 }
610
611 /** Sends pending data to the server (e.g., registrations, acks, registration sync messages). */
612 void sendMessageToServer() {
613 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
614 if (nextMessageSendTimeMs > internalScheduler.getCurrentTimeMs()) {
615 logger.warning("In quiet period: not sending message to server: %s > %s",
616 nextMessageSendTimeMs, internalScheduler.getCurrentTimeMs());
617 return;
618 }
619
620 // Create the message from the batcher.
621 ClientToServerMessage message;
622 try {
623 message = batcher.toMessage(createClientHeader(), listener.getClientToken( ) != null);
624 if (message == null) {
625 // Happens when we don't have a token and are not sending an initialize message. Logged
626 // in batcher.toMessage().
627 return;
628 }
629 } catch (ProtoWrapper.ValidationArgumentException exception) {
630 logger.severe("Tried to send invalid message: %s", batcher);
631 statistics.recordError(ClientErrorType.OUTGOING_MESSAGE_FAILURE);
632 return;
633 }
634 ++messageId;
635
636 statistics.recordSentMessage(SentMessageType.TOTAL);
637 logger.fine("Sending message to server: %s", message);
638 network.sendMessage(message.toByteArray());
639
640 // Record that the message was sent. We're invoking the listener directly, r ather than
641 // scheduling a new work unit to do it. It would be safer to do a schedule, but that's hard to
642 // do in Android, we wrote this listener (it's InvalidationClientCore, so we know what it does),
643 // and it's the last line of this function.
644 listener.handleMessageSent();
645 }
646
647 /** Returns the header to include on a message to the server. */
648 private ClientHeader createClientHeader() {
649 Preconditions.checkState(internalScheduler.isRunningOnThread(), "Not on inte rnal thread");
650 return ClientHeader.create(ClientConstants.PROTOCOL_VERSION,
651 listener.getClientToken(), listener.getRegistrationSummary(),
652 internalScheduler.getCurrentTimeMs(), lastKnownServerTimeMs, Integer.to String(messageId),
653 clientType);
654 }
655
656 @Override
657 public ProtocolHandlerState marshal() {
658 return ProtocolHandlerState.create(messageId, lastKnownServerTimeMs, nextMes sageSendTimeMs,
659 batcher.marshal());
660 }
661 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698