Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(385)

Side by Side Diff: third_party/grpc/src/node/src/client.js

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 /**
35 * Client module
36 *
37 * This module contains the factory method for creating Client classes, and the
38 * method calling code for all types of methods.
39 *
40 * For example, to create a client and call a method on it:
41 *
42 * var proto_obj = grpc.load(proto_file_path);
43 * var Client = proto_obj.package.subpackage.ServiceName;
44 * var client = new Client(server_address, client_credentials);
45 * var call = client.unaryMethod(arguments, callback);
46 *
47 * @module
48 */
49
50 'use strict';
51
52 var _ = require('lodash');
53
54 var grpc = require('./grpc_extension');
55
56 var common = require('./common');
57
58 var Metadata = require('./metadata');
59
60 var EventEmitter = require('events').EventEmitter;
61
62 var stream = require('stream');
63
64 var Readable = stream.Readable;
65 var Writable = stream.Writable;
66 var Duplex = stream.Duplex;
67 var util = require('util');
68 var version = require('../../../package.json').version;
69
70 util.inherits(ClientWritableStream, Writable);
71
72 /**
73 * A stream that the client can write to. Used for calls that are streaming from
74 * the client side.
75 * @constructor
76 * @param {grpc.Call} call The call object to send data with
77 * @param {function(*):Buffer=} serialize Serialization function for writes.
78 */
79 function ClientWritableStream(call, serialize) {
80 Writable.call(this, {objectMode: true});
81 this.call = call;
82 this.serialize = common.wrapIgnoreNull(serialize);
83 this.on('finish', function() {
84 var batch = {};
85 batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
86 call.startBatch(batch, function() {});
87 });
88 }
89
90 /**
91 * Attempt to write the given chunk. Calls the callback when done. This is an
92 * implementation of a method needed for implementing stream.Writable.
93 * @access private
94 * @param {Buffer} chunk The chunk to write
95 * @param {string} encoding Used to pass write flags
96 * @param {function(Error=)} callback Called when the write is complete
97 */
98 function _write(chunk, encoding, callback) {
99 /* jshint validthis: true */
100 var batch = {};
101 var message = this.serialize(chunk);
102 if (_.isFinite(encoding)) {
103 /* Attach the encoding if it is a finite number. This is the closest we
104 * can get to checking that it is valid flags */
105 message.grpcWriteFlags = encoding;
106 }
107 batch[grpc.opType.SEND_MESSAGE] = message;
108 this.call.startBatch(batch, function(err, event) {
109 if (err) {
110 // Something has gone wrong. Stop writing by failing to call callback
111 return;
112 }
113 callback();
114 });
115 }
116
117 ClientWritableStream.prototype._write = _write;
118
119 util.inherits(ClientReadableStream, Readable);
120
121 /**
122 * A stream that the client can read from. Used for calls that are streaming
123 * from the server side.
124 * @constructor
125 * @param {grpc.Call} call The call object to read data with
126 * @param {function(Buffer):*=} deserialize Deserialization function for reads
127 */
128 function ClientReadableStream(call, deserialize) {
129 Readable.call(this, {objectMode: true});
130 this.call = call;
131 this.finished = false;
132 this.reading = false;
133 this.deserialize = common.wrapIgnoreNull(deserialize);
134 /* Status generated from reading messages from the server. Overrides the
135 * status from the server if not OK */
136 this.read_status = null;
137 /* Status received from the server. */
138 this.received_status = null;
139 }
140
141 /**
142 * Called when all messages from the server have been processed. The status
143 * parameter indicates that the call should end with that status. status
144 * defaults to OK if not provided.
145 * @param {Object!} status The status that the call should end with
146 */
147 function _readsDone(status) {
148 /* jshint validthis: true */
149 if (!status) {
150 status = {code: grpc.status.OK, details: 'OK'};
151 }
152 this.finished = true;
153 this.read_status = status;
154 this._emitStatusIfDone();
155 }
156
157 ClientReadableStream.prototype._readsDone = _readsDone;
158
159 /**
160 * Called to indicate that we have received a status from the server.
161 */
162 function _receiveStatus(status) {
163 /* jshint validthis: true */
164 this.received_status = status;
165 this._emitStatusIfDone();
166 }
167
168 ClientReadableStream.prototype._receiveStatus = _receiveStatus;
169
170 /**
171 * If we have both processed all incoming messages and received the status from
172 * the server, emit the status. Otherwise, do nothing.
173 */
174 function _emitStatusIfDone() {
175 /* jshint validthis: true */
176 var status;
177 if (this.read_status && this.received_status) {
178 if (this.read_status.code !== grpc.status.OK) {
179 status = this.read_status;
180 } else {
181 status = this.received_status;
182 }
183 this.emit('status', status);
184 if (status.code !== grpc.status.OK) {
185 var error = new Error(status.details);
186 error.code = status.code;
187 error.metadata = status.metadata;
188 this.emit('error', error);
189 return;
190 }
191 }
192 }
193
194 ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
195
196 /**
197 * Read the next object from the stream.
198 * @access private
199 * @param {*} size Ignored because we use objectMode=true
200 */
201 function _read(size) {
202 /* jshint validthis: true */
203 var self = this;
204 /**
205 * Callback to be called when a READ event is received. Pushes the data onto
206 * the read queue and starts reading again if applicable
207 * @param {grpc.Event} event READ event object
208 */
209 function readCallback(err, event) {
210 if (err) {
211 // Something has gone wrong. Stop reading and wait for status
212 self.finished = true;
213 self._readsDone();
214 return;
215 }
216 var data = event.read;
217 var deserialized;
218 try {
219 deserialized = self.deserialize(data);
220 } catch (e) {
221 self._readsDone({code: grpc.status.INTERNAL,
222 details: 'Failed to parse server response'});
223 }
224 if (data === null) {
225 self._readsDone();
226 }
227 if (self.push(deserialized) && data !== null) {
228 var read_batch = {};
229 read_batch[grpc.opType.RECV_MESSAGE] = true;
230 self.call.startBatch(read_batch, readCallback);
231 } else {
232 self.reading = false;
233 }
234 }
235 if (self.finished) {
236 self.push(null);
237 } else {
238 if (!self.reading) {
239 self.reading = true;
240 var read_batch = {};
241 read_batch[grpc.opType.RECV_MESSAGE] = true;
242 self.call.startBatch(read_batch, readCallback);
243 }
244 }
245 }
246
247 ClientReadableStream.prototype._read = _read;
248
249 util.inherits(ClientDuplexStream, Duplex);
250
251 /**
252 * A stream that the client can read from or write to. Used for calls with
253 * duplex streaming.
254 * @constructor
255 * @param {grpc.Call} call Call object to proxy
256 * @param {function(*):Buffer=} serialize Serialization function for requests
257 * @param {function(Buffer):*=} deserialize Deserialization function for
258 * responses
259 */
260 function ClientDuplexStream(call, serialize, deserialize) {
261 Duplex.call(this, {objectMode: true});
262 this.serialize = common.wrapIgnoreNull(serialize);
263 this.deserialize = common.wrapIgnoreNull(deserialize);
264 this.call = call;
265 /* Status generated from reading messages from the server. Overrides the
266 * status from the server if not OK */
267 this.read_status = null;
268 /* Status received from the server. */
269 this.received_status = null;
270 this.on('finish', function() {
271 var batch = {};
272 batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
273 call.startBatch(batch, function() {});
274 });
275 }
276
277 ClientDuplexStream.prototype._readsDone = _readsDone;
278 ClientDuplexStream.prototype._receiveStatus = _receiveStatus;
279 ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone;
280 ClientDuplexStream.prototype._read = _read;
281 ClientDuplexStream.prototype._write = _write;
282
283 /**
284 * Cancel the ongoing call
285 */
286 function cancel() {
287 /* jshint validthis: true */
288 this.call.cancel();
289 }
290
291 ClientReadableStream.prototype.cancel = cancel;
292 ClientWritableStream.prototype.cancel = cancel;
293 ClientDuplexStream.prototype.cancel = cancel;
294
295 /**
296 * Get the endpoint this call/stream is connected to.
297 * @return {string} The URI of the endpoint
298 */
299 function getPeer() {
300 /* jshint validthis: true */
301 return this.call.getPeer();
302 }
303
304 ClientReadableStream.prototype.getPeer = getPeer;
305 ClientWritableStream.prototype.getPeer = getPeer;
306 ClientDuplexStream.prototype.getPeer = getPeer;
307
308 /**
309 * Get a call object built with the provided options. Keys for options are
310 * 'deadline', which takes a date or number, and 'host', which takes a string
311 * and overrides the hostname to connect to.
312 * @param {Object} options Options map.
313 */
314 function getCall(channel, method, options) {
315 var deadline;
316 var host;
317 var parent;
318 var propagate_flags;
319 var credentials;
320 if (options) {
321 deadline = options.deadline;
322 host = options.host;
323 parent = _.get(options, 'parent.call');
324 propagate_flags = options.propagate_flags;
325 credentials = options.credentials;
326 }
327 if (deadline === undefined) {
328 deadline = Infinity;
329 }
330 var call = new grpc.Call(channel, method, deadline, host,
331 parent, propagate_flags);
332 if (credentials) {
333 call.setCredentials(credentials);
334 }
335 return call;
336 }
337
338 /**
339 * Get a function that can make unary requests to the specified method.
340 * @param {string} method The name of the method to request
341 * @param {function(*):Buffer} serialize The serialization function for inputs
342 * @param {function(Buffer)} deserialize The deserialization function for
343 * outputs
344 * @return {Function} makeUnaryRequest
345 */
346 function makeUnaryRequestFunction(method, serialize, deserialize) {
347 /**
348 * Make a unary request with this method on the given channel with the given
349 * argument, callback, etc.
350 * @this {Client} Client object. Must have a channel member.
351 * @param {*} argument The argument to the call. Should be serializable with
352 * serialize
353 * @param {function(?Error, value=)} callback The callback to for when the
354 * response is received
355 * @param {Metadata=} metadata Metadata to add to the call
356 * @param {Object=} options Options map
357 * @return {EventEmitter} An event emitter for stream related events
358 */
359 function makeUnaryRequest(argument, callback, metadata, options) {
360 /* jshint validthis: true */
361 var emitter = new EventEmitter();
362 var call = getCall(this.$channel, method, options);
363 if (metadata === null || metadata === undefined) {
364 metadata = new Metadata();
365 } else {
366 metadata = metadata.clone();
367 }
368 emitter.cancel = function cancel() {
369 call.cancel();
370 };
371 emitter.getPeer = function getPeer() {
372 return call.getPeer();
373 };
374 var client_batch = {};
375 var message = serialize(argument);
376 if (options) {
377 message.grpcWriteFlags = options.flags;
378 }
379 client_batch[grpc.opType.SEND_INITIAL_METADATA] =
380 metadata._getCoreRepresentation();
381 client_batch[grpc.opType.SEND_MESSAGE] = message;
382 client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
383 client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
384 client_batch[grpc.opType.RECV_MESSAGE] = true;
385 client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
386 call.startBatch(client_batch, function(err, response) {
387 response.status.metadata = Metadata._fromCoreRepresentation(
388 response.status.metadata);
389 var status = response.status;
390 var error;
391 var deserialized;
392 if (status.code === grpc.status.OK) {
393 if (err) {
394 // Got a batch error, but OK status. Something went wrong
395 callback(err);
396 return;
397 } else {
398 try {
399 deserialized = deserialize(response.read);
400 } catch (e) {
401 /* Change status to indicate bad server response. This will result
402 * in passing an error to the callback */
403 status = {
404 code: grpc.status.INTERNAL,
405 details: 'Failed to parse server response'
406 };
407 }
408 }
409 }
410 if (status.code !== grpc.status.OK) {
411 error = new Error(status.details);
412 error.code = status.code;
413 error.metadata = status.metadata;
414 callback(error);
415 } else {
416 callback(null, deserialized);
417 }
418 emitter.emit('status', status);
419 emitter.emit('metadata', Metadata._fromCoreRepresentation(
420 response.metadata));
421 });
422 return emitter;
423 }
424 return makeUnaryRequest;
425 }
426
427 /**
428 * Get a function that can make client stream requests to the specified method.
429 * @param {string} method The name of the method to request
430 * @param {function(*):Buffer} serialize The serialization function for inputs
431 * @param {function(Buffer)} deserialize The deserialization function for
432 * outputs
433 * @return {Function} makeClientStreamRequest
434 */
435 function makeClientStreamRequestFunction(method, serialize, deserialize) {
436 /**
437 * Make a client stream request with this method on the given channel with the
438 * given callback, etc.
439 * @this {Client} Client object. Must have a channel member.
440 * @param {function(?Error, value=)} callback The callback to for when the
441 * response is received
442 * @param {Metadata=} metadata Array of metadata key/value pairs to add to the
443 * call
444 * @param {Object=} options Options map
445 * @return {EventEmitter} An event emitter for stream related events
446 */
447 function makeClientStreamRequest(callback, metadata, options) {
448 /* jshint validthis: true */
449 var call = getCall(this.$channel, method, options);
450 if (metadata === null || metadata === undefined) {
451 metadata = new Metadata();
452 } else {
453 metadata = metadata.clone();
454 }
455 var stream = new ClientWritableStream(call, serialize);
456 var metadata_batch = {};
457 metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
458 metadata._getCoreRepresentation();
459 metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
460 call.startBatch(metadata_batch, function(err, response) {
461 if (err) {
462 // The call has stopped for some reason. A non-OK status will arrive
463 // in the other batch.
464 return;
465 }
466 stream.emit('metadata', Metadata._fromCoreRepresentation(
467 response.metadata));
468 });
469 var client_batch = {};
470 client_batch[grpc.opType.RECV_MESSAGE] = true;
471 client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
472 call.startBatch(client_batch, function(err, response) {
473 response.status.metadata = Metadata._fromCoreRepresentation(
474 response.status.metadata);
475 var status = response.status;
476 var error;
477 var deserialized;
478 if (status.code === grpc.status.OK) {
479 if (err) {
480 // Got a batch error, but OK status. Something went wrong
481 callback(err);
482 return;
483 } else {
484 try {
485 deserialized = deserialize(response.read);
486 } catch (e) {
487 /* Change status to indicate bad server response. This will result
488 * in passing an error to the callback */
489 status = {
490 code: grpc.status.INTERNAL,
491 details: 'Failed to parse server response'
492 };
493 }
494 }
495 }
496 if (status.code !== grpc.status.OK) {
497 error = new Error(response.status.details);
498 error.code = status.code;
499 error.metadata = status.metadata;
500 callback(error);
501 } else {
502 callback(null, deserialized);
503 }
504 stream.emit('status', status);
505 });
506 return stream;
507 }
508 return makeClientStreamRequest;
509 }
510
511 /**
512 * Get a function that can make server stream requests to the specified method.
513 * @param {string} method The name of the method to request
514 * @param {function(*):Buffer} serialize The serialization function for inputs
515 * @param {function(Buffer)} deserialize The deserialization function for
516 * outputs
517 * @return {Function} makeServerStreamRequest
518 */
519 function makeServerStreamRequestFunction(method, serialize, deserialize) {
520 /**
521 * Make a server stream request with this method on the given channel with the
522 * given argument, etc.
523 * @this {SurfaceClient} Client object. Must have a channel member.
524 * @param {*} argument The argument to the call. Should be serializable with
525 * serialize
526 * @param {Metadata=} metadata Array of metadata key/value pairs to add to the
527 * call
528 * @param {Object} options Options map
529 * @return {EventEmitter} An event emitter for stream related events
530 */
531 function makeServerStreamRequest(argument, metadata, options) {
532 /* jshint validthis: true */
533 var call = getCall(this.$channel, method, options);
534 if (metadata === null || metadata === undefined) {
535 metadata = new Metadata();
536 } else {
537 metadata = metadata.clone();
538 }
539 var stream = new ClientReadableStream(call, deserialize);
540 var start_batch = {};
541 var message = serialize(argument);
542 if (options) {
543 message.grpcWriteFlags = options.flags;
544 }
545 start_batch[grpc.opType.SEND_INITIAL_METADATA] =
546 metadata._getCoreRepresentation();
547 start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
548 start_batch[grpc.opType.SEND_MESSAGE] = message;
549 start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
550 call.startBatch(start_batch, function(err, response) {
551 if (err) {
552 // The call has stopped for some reason. A non-OK status will arrive
553 // in the other batch.
554 return;
555 }
556 stream.emit('metadata', Metadata._fromCoreRepresentation(
557 response.metadata));
558 });
559 var status_batch = {};
560 status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
561 call.startBatch(status_batch, function(err, response) {
562 if (err) {
563 stream.emit('error', err);
564 return;
565 }
566 response.status.metadata = Metadata._fromCoreRepresentation(
567 response.status.metadata);
568 stream._receiveStatus(response.status);
569 });
570 return stream;
571 }
572 return makeServerStreamRequest;
573 }
574
575 /**
576 * Get a function that can make bidirectional stream requests to the specified
577 * method.
578 * @param {string} method The name of the method to request
579 * @param {function(*):Buffer} serialize The serialization function for inputs
580 * @param {function(Buffer)} deserialize The deserialization function for
581 * outputs
582 * @return {Function} makeBidiStreamRequest
583 */
584 function makeBidiStreamRequestFunction(method, serialize, deserialize) {
585 /**
586 * Make a bidirectional stream request with this method on the given channel.
587 * @this {SurfaceClient} Client object. Must have a channel member.
588 * @param {Metadata=} metadata Array of metadata key/value pairs to add to the
589 * call
590 * @param {Options} options Options map
591 * @return {EventEmitter} An event emitter for stream related events
592 */
593 function makeBidiStreamRequest(metadata, options) {
594 /* jshint validthis: true */
595 var call = getCall(this.$channel, method, options);
596 if (metadata === null || metadata === undefined) {
597 metadata = new Metadata();
598 } else {
599 metadata = metadata.clone();
600 }
601 var stream = new ClientDuplexStream(call, serialize, deserialize);
602 var start_batch = {};
603 start_batch[grpc.opType.SEND_INITIAL_METADATA] =
604 metadata._getCoreRepresentation();
605 start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
606 call.startBatch(start_batch, function(err, response) {
607 if (err) {
608 // The call has stopped for some reason. A non-OK status will arrive
609 // in the other batch.
610 return;
611 }
612 stream.emit('metadata', Metadata._fromCoreRepresentation(
613 response.metadata));
614 });
615 var status_batch = {};
616 status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
617 call.startBatch(status_batch, function(err, response) {
618 if (err) {
619 stream.emit('error', err);
620 return;
621 }
622 response.status.metadata = Metadata._fromCoreRepresentation(
623 response.status.metadata);
624 stream._receiveStatus(response.status);
625 });
626 return stream;
627 }
628 return makeBidiStreamRequest;
629 }
630
631
632 /**
633 * Map with short names for each of the requester maker functions. Used in
634 * makeClientConstructor
635 */
636 var requester_makers = {
637 unary: makeUnaryRequestFunction,
638 server_stream: makeServerStreamRequestFunction,
639 client_stream: makeClientStreamRequestFunction,
640 bidi: makeBidiStreamRequestFunction
641 };
642
643 /**
644 * Creates a constructor for a client with the given methods. The methods object
645 * maps method name to an object with the following keys:
646 * path: The path on the server for accessing the method. For example, for
647 * protocol buffers, we use "/service_name/method_name"
648 * requestStream: bool indicating whether the client sends a stream
649 * resonseStream: bool indicating whether the server sends a stream
650 * requestSerialize: function to serialize request objects
651 * responseDeserialize: function to deserialize response objects
652 * @param {Object} methods An object mapping method names to method attributes
653 * @param {string} serviceName The fully qualified name of the service
654 * @return {function(string, Object)} New client constructor
655 */
656 exports.makeClientConstructor = function(methods, serviceName) {
657 /**
658 * Create a client with the given methods
659 * @constructor
660 * @param {string} address The address of the server to connect to
661 * @param {grpc.Credentials} credentials Credentials to use to connect
662 * to the server
663 * @param {Object} options Options to pass to the underlying channel
664 */
665 function Client(address, credentials, options) {
666 if (!options) {
667 options = {};
668 }
669 /* Append the grpc-node user agent string after the application user agent
670 * string, and put the combination at the beginning of the user agent string
671 */
672 if (options['grpc.primary_user_agent']) {
673 options['grpc.primary_user_agent'] += ' ';
674 } else {
675 options['grpc.primary_user_agent'] = '';
676 }
677 options['grpc.primary_user_agent'] += 'grpc-node/' + version;
678 /* Private fields use $ as a prefix instead of _ because it is an invalid
679 * prefix of a method name */
680 this.$channel = new grpc.Channel(address, credentials, options);
681 }
682
683 _.each(methods, function(attrs, name) {
684 var method_type;
685 if (_.startsWith(name, '$')) {
686 throw new Error('Method names cannot start with $');
687 }
688 if (attrs.requestStream) {
689 if (attrs.responseStream) {
690 method_type = 'bidi';
691 } else {
692 method_type = 'client_stream';
693 }
694 } else {
695 if (attrs.responseStream) {
696 method_type = 'server_stream';
697 } else {
698 method_type = 'unary';
699 }
700 }
701 var serialize = attrs.requestSerialize;
702 var deserialize = attrs.responseDeserialize;
703 Client.prototype[name] = requester_makers[method_type](
704 attrs.path, serialize, deserialize);
705 // Associate all provided attributes with the method
706 _.assign(Client.prototype[name], attrs);
707 });
708
709 return Client;
710 };
711
712 /**
713 * Return the underlying channel object for the specified client
714 * @param {Client} client
715 * @return {Channel} The channel
716 */
717 exports.getClientChannel = function(client) {
718 return client.$channel;
719 };
720
721 /**
722 * Wait for the client to be ready. The callback will be called when the
723 * client has successfully connected to the server, and it will be called
724 * with an error if the attempt to connect to the server has unrecoverablly
725 * failed or if the deadline expires. This function will make the channel
726 * start connecting if it has not already done so.
727 * @param {Client} client The client to wait on
728 * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass
729 * Infinity to wait forever.
730 * @param {function(Error)} callback The callback to call when done attempting
731 * to connect.
732 */
733 exports.waitForClientReady = function(client, deadline, callback) {
734 var checkState = function(err) {
735 if (err) {
736 callback(new Error('Failed to connect before the deadline'));
737 return;
738 }
739 var new_state = client.$channel.getConnectivityState(true);
740 if (new_state === grpc.connectivityState.READY) {
741 callback();
742 } else if (new_state === grpc.connectivityState.FATAL_FAILURE) {
743 callback(new Error('Failed to connect to server'));
744 } else {
745 client.$channel.watchConnectivityState(new_state, deadline, checkState);
746 }
747 };
748 checkState();
749 };
750
751 /**
752 * Creates a constructor for clients for the given service
753 * @param {ProtoBuf.Reflect.Service} service The service to generate a client
754 * for
755 * @param {Object=} options Options to apply to the client
756 * @return {function(string, Object)} New client constructor
757 */
758 exports.makeProtobufClientConstructor = function(service, options) {
759 var method_attrs = common.getProtobufServiceAttrs(service, service.name,
760 options);
761 var Client = exports.makeClientConstructor(
762 method_attrs, common.fullyQualifiedName(service));
763 Client.service = service;
764 Client.service.grpc_options = options;
765 return Client;
766 };
767
768 /**
769 * Map of status code names to status codes
770 */
771 exports.status = grpc.status;
772
773 /**
774 * See docs for client.callError
775 */
776 exports.callError = grpc.callError;
OLDNEW
« no previous file with comments | « third_party/grpc/src/node/performance/worker_service_impl.js ('k') | third_party/grpc/src/node/src/common.js » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698