Index: third_party/grpc/src/node/src/client.js |
diff --git a/third_party/grpc/src/node/src/client.js b/third_party/grpc/src/node/src/client.js |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9cc5a62bdbcafe486222a0f081f37111078c81b0 |
--- /dev/null |
+++ b/third_party/grpc/src/node/src/client.js |
@@ -0,0 +1,776 @@ |
+/* |
+ * |
+ * Copyright 2015-2016, Google Inc. |
+ * All rights reserved. |
+ * |
+ * Redistribution and use in source and binary forms, with or without |
+ * modification, are permitted provided that the following conditions are |
+ * met: |
+ * |
+ * * Redistributions of source code must retain the above copyright |
+ * notice, this list of conditions and the following disclaimer. |
+ * * Redistributions in binary form must reproduce the above |
+ * copyright notice, this list of conditions and the following disclaimer |
+ * in the documentation and/or other materials provided with the |
+ * distribution. |
+ * * Neither the name of Google Inc. nor the names of its |
+ * contributors may be used to endorse or promote products derived from |
+ * this software without specific prior written permission. |
+ * |
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ * |
+ */ |
+ |
+/** |
+ * Client module |
+ * |
+ * This module contains the factory method for creating Client classes, and the |
+ * method calling code for all types of methods. |
+ * |
+ * For example, to create a client and call a method on it: |
+ * |
+ * var proto_obj = grpc.load(proto_file_path); |
+ * var Client = proto_obj.package.subpackage.ServiceName; |
+ * var client = new Client(server_address, client_credentials); |
+ * var call = client.unaryMethod(arguments, callback); |
+ * |
+ * @module |
+ */ |
+ |
+'use strict'; |
+ |
+var _ = require('lodash'); |
+ |
+var grpc = require('./grpc_extension'); |
+ |
+var common = require('./common'); |
+ |
+var Metadata = require('./metadata'); |
+ |
+var EventEmitter = require('events').EventEmitter; |
+ |
+var stream = require('stream'); |
+ |
+var Readable = stream.Readable; |
+var Writable = stream.Writable; |
+var Duplex = stream.Duplex; |
+var util = require('util'); |
+var version = require('../../../package.json').version; |
+ |
+util.inherits(ClientWritableStream, Writable); |
+ |
+/** |
+ * A stream that the client can write to. Used for calls that are streaming from |
+ * the client side. |
+ * @constructor |
+ * @param {grpc.Call} call The call object to send data with |
+ * @param {function(*):Buffer=} serialize Serialization function for writes. |
+ */ |
+function ClientWritableStream(call, serialize) { |
+ Writable.call(this, {objectMode: true}); |
+ this.call = call; |
+ this.serialize = common.wrapIgnoreNull(serialize); |
+ this.on('finish', function() { |
+ var batch = {}; |
+ batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
+ call.startBatch(batch, function() {}); |
+ }); |
+} |
+ |
+/** |
+ * Attempt to write the given chunk. Calls the callback when done. This is an |
+ * implementation of a method needed for implementing stream.Writable. |
+ * @access private |
+ * @param {Buffer} chunk The chunk to write |
+ * @param {string} encoding Used to pass write flags |
+ * @param {function(Error=)} callback Called when the write is complete |
+ */ |
+function _write(chunk, encoding, callback) { |
+ /* jshint validthis: true */ |
+ var batch = {}; |
+ var message = this.serialize(chunk); |
+ if (_.isFinite(encoding)) { |
+ /* Attach the encoding if it is a finite number. This is the closest we |
+ * can get to checking that it is valid flags */ |
+ message.grpcWriteFlags = encoding; |
+ } |
+ batch[grpc.opType.SEND_MESSAGE] = message; |
+ this.call.startBatch(batch, function(err, event) { |
+ if (err) { |
+ // Something has gone wrong. Stop writing by failing to call callback |
+ return; |
+ } |
+ callback(); |
+ }); |
+} |
+ |
+ClientWritableStream.prototype._write = _write; |
+ |
+util.inherits(ClientReadableStream, Readable); |
+ |
+/** |
+ * A stream that the client can read from. Used for calls that are streaming |
+ * from the server side. |
+ * @constructor |
+ * @param {grpc.Call} call The call object to read data with |
+ * @param {function(Buffer):*=} deserialize Deserialization function for reads |
+ */ |
+function ClientReadableStream(call, deserialize) { |
+ Readable.call(this, {objectMode: true}); |
+ this.call = call; |
+ this.finished = false; |
+ this.reading = false; |
+ this.deserialize = common.wrapIgnoreNull(deserialize); |
+ /* Status generated from reading messages from the server. Overrides the |
+ * status from the server if not OK */ |
+ this.read_status = null; |
+ /* Status received from the server. */ |
+ this.received_status = null; |
+} |
+ |
+/** |
+ * Called when all messages from the server have been processed. The status |
+ * parameter indicates that the call should end with that status. status |
+ * defaults to OK if not provided. |
+ * @param {Object!} status The status that the call should end with |
+ */ |
+function _readsDone(status) { |
+ /* jshint validthis: true */ |
+ if (!status) { |
+ status = {code: grpc.status.OK, details: 'OK'}; |
+ } |
+ this.finished = true; |
+ this.read_status = status; |
+ this._emitStatusIfDone(); |
+} |
+ |
+ClientReadableStream.prototype._readsDone = _readsDone; |
+ |
+/** |
+ * Called to indicate that we have received a status from the server. |
+ */ |
+function _receiveStatus(status) { |
+ /* jshint validthis: true */ |
+ this.received_status = status; |
+ this._emitStatusIfDone(); |
+} |
+ |
+ClientReadableStream.prototype._receiveStatus = _receiveStatus; |
+ |
+/** |
+ * If we have both processed all incoming messages and received the status from |
+ * the server, emit the status. Otherwise, do nothing. |
+ */ |
+function _emitStatusIfDone() { |
+ /* jshint validthis: true */ |
+ var status; |
+ if (this.read_status && this.received_status) { |
+ if (this.read_status.code !== grpc.status.OK) { |
+ status = this.read_status; |
+ } else { |
+ status = this.received_status; |
+ } |
+ this.emit('status', status); |
+ if (status.code !== grpc.status.OK) { |
+ var error = new Error(status.details); |
+ error.code = status.code; |
+ error.metadata = status.metadata; |
+ this.emit('error', error); |
+ return; |
+ } |
+ } |
+} |
+ |
+ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone; |
+ |
+/** |
+ * Read the next object from the stream. |
+ * @access private |
+ * @param {*} size Ignored because we use objectMode=true |
+ */ |
+function _read(size) { |
+ /* jshint validthis: true */ |
+ var self = this; |
+ /** |
+ * Callback to be called when a READ event is received. Pushes the data onto |
+ * the read queue and starts reading again if applicable |
+ * @param {grpc.Event} event READ event object |
+ */ |
+ function readCallback(err, event) { |
+ if (err) { |
+ // Something has gone wrong. Stop reading and wait for status |
+ self.finished = true; |
+ self._readsDone(); |
+ return; |
+ } |
+ var data = event.read; |
+ var deserialized; |
+ try { |
+ deserialized = self.deserialize(data); |
+ } catch (e) { |
+ self._readsDone({code: grpc.status.INTERNAL, |
+ details: 'Failed to parse server response'}); |
+ } |
+ if (data === null) { |
+ self._readsDone(); |
+ } |
+ if (self.push(deserialized) && data !== null) { |
+ var read_batch = {}; |
+ read_batch[grpc.opType.RECV_MESSAGE] = true; |
+ self.call.startBatch(read_batch, readCallback); |
+ } else { |
+ self.reading = false; |
+ } |
+ } |
+ if (self.finished) { |
+ self.push(null); |
+ } else { |
+ if (!self.reading) { |
+ self.reading = true; |
+ var read_batch = {}; |
+ read_batch[grpc.opType.RECV_MESSAGE] = true; |
+ self.call.startBatch(read_batch, readCallback); |
+ } |
+ } |
+} |
+ |
+ClientReadableStream.prototype._read = _read; |
+ |
+util.inherits(ClientDuplexStream, Duplex); |
+ |
+/** |
+ * A stream that the client can read from or write to. Used for calls with |
+ * duplex streaming. |
+ * @constructor |
+ * @param {grpc.Call} call Call object to proxy |
+ * @param {function(*):Buffer=} serialize Serialization function for requests |
+ * @param {function(Buffer):*=} deserialize Deserialization function for |
+ * responses |
+ */ |
+function ClientDuplexStream(call, serialize, deserialize) { |
+ Duplex.call(this, {objectMode: true}); |
+ this.serialize = common.wrapIgnoreNull(serialize); |
+ this.deserialize = common.wrapIgnoreNull(deserialize); |
+ this.call = call; |
+ /* Status generated from reading messages from the server. Overrides the |
+ * status from the server if not OK */ |
+ this.read_status = null; |
+ /* Status received from the server. */ |
+ this.received_status = null; |
+ this.on('finish', function() { |
+ var batch = {}; |
+ batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
+ call.startBatch(batch, function() {}); |
+ }); |
+} |
+ |
+ClientDuplexStream.prototype._readsDone = _readsDone; |
+ClientDuplexStream.prototype._receiveStatus = _receiveStatus; |
+ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone; |
+ClientDuplexStream.prototype._read = _read; |
+ClientDuplexStream.prototype._write = _write; |
+ |
+/** |
+ * Cancel the ongoing call |
+ */ |
+function cancel() { |
+ /* jshint validthis: true */ |
+ this.call.cancel(); |
+} |
+ |
+ClientReadableStream.prototype.cancel = cancel; |
+ClientWritableStream.prototype.cancel = cancel; |
+ClientDuplexStream.prototype.cancel = cancel; |
+ |
+/** |
+ * Get the endpoint this call/stream is connected to. |
+ * @return {string} The URI of the endpoint |
+ */ |
+function getPeer() { |
+ /* jshint validthis: true */ |
+ return this.call.getPeer(); |
+} |
+ |
+ClientReadableStream.prototype.getPeer = getPeer; |
+ClientWritableStream.prototype.getPeer = getPeer; |
+ClientDuplexStream.prototype.getPeer = getPeer; |
+ |
+/** |
+ * Get a call object built with the provided options. Keys for options are |
+ * 'deadline', which takes a date or number, and 'host', which takes a string |
+ * and overrides the hostname to connect to. |
+ * @param {Object} options Options map. |
+ */ |
+function getCall(channel, method, options) { |
+ var deadline; |
+ var host; |
+ var parent; |
+ var propagate_flags; |
+ var credentials; |
+ if (options) { |
+ deadline = options.deadline; |
+ host = options.host; |
+ parent = _.get(options, 'parent.call'); |
+ propagate_flags = options.propagate_flags; |
+ credentials = options.credentials; |
+ } |
+ if (deadline === undefined) { |
+ deadline = Infinity; |
+ } |
+ var call = new grpc.Call(channel, method, deadline, host, |
+ parent, propagate_flags); |
+ if (credentials) { |
+ call.setCredentials(credentials); |
+ } |
+ return call; |
+} |
+ |
+/** |
+ * Get a function that can make unary requests to the specified method. |
+ * @param {string} method The name of the method to request |
+ * @param {function(*):Buffer} serialize The serialization function for inputs |
+ * @param {function(Buffer)} deserialize The deserialization function for |
+ * outputs |
+ * @return {Function} makeUnaryRequest |
+ */ |
+function makeUnaryRequestFunction(method, serialize, deserialize) { |
+ /** |
+ * Make a unary request with this method on the given channel with the given |
+ * argument, callback, etc. |
+ * @this {Client} Client object. Must have a channel member. |
+ * @param {*} argument The argument to the call. Should be serializable with |
+ * serialize |
+ * @param {function(?Error, value=)} callback The callback to for when the |
+ * response is received |
+ * @param {Metadata=} metadata Metadata to add to the call |
+ * @param {Object=} options Options map |
+ * @return {EventEmitter} An event emitter for stream related events |
+ */ |
+ function makeUnaryRequest(argument, callback, metadata, options) { |
+ /* jshint validthis: true */ |
+ var emitter = new EventEmitter(); |
+ var call = getCall(this.$channel, method, options); |
+ if (metadata === null || metadata === undefined) { |
+ metadata = new Metadata(); |
+ } else { |
+ metadata = metadata.clone(); |
+ } |
+ emitter.cancel = function cancel() { |
+ call.cancel(); |
+ }; |
+ emitter.getPeer = function getPeer() { |
+ return call.getPeer(); |
+ }; |
+ var client_batch = {}; |
+ var message = serialize(argument); |
+ if (options) { |
+ message.grpcWriteFlags = options.flags; |
+ } |
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = |
+ metadata._getCoreRepresentation(); |
+ client_batch[grpc.opType.SEND_MESSAGE] = message; |
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
+ client_batch[grpc.opType.RECV_MESSAGE] = true; |
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
+ call.startBatch(client_batch, function(err, response) { |
+ response.status.metadata = Metadata._fromCoreRepresentation( |
+ response.status.metadata); |
+ var status = response.status; |
+ var error; |
+ var deserialized; |
+ if (status.code === grpc.status.OK) { |
+ if (err) { |
+ // Got a batch error, but OK status. Something went wrong |
+ callback(err); |
+ return; |
+ } else { |
+ try { |
+ deserialized = deserialize(response.read); |
+ } catch (e) { |
+ /* Change status to indicate bad server response. This will result |
+ * in passing an error to the callback */ |
+ status = { |
+ code: grpc.status.INTERNAL, |
+ details: 'Failed to parse server response' |
+ }; |
+ } |
+ } |
+ } |
+ if (status.code !== grpc.status.OK) { |
+ error = new Error(status.details); |
+ error.code = status.code; |
+ error.metadata = status.metadata; |
+ callback(error); |
+ } else { |
+ callback(null, deserialized); |
+ } |
+ emitter.emit('status', status); |
+ emitter.emit('metadata', Metadata._fromCoreRepresentation( |
+ response.metadata)); |
+ }); |
+ return emitter; |
+ } |
+ return makeUnaryRequest; |
+} |
+ |
+/** |
+ * Get a function that can make client stream requests to the specified method. |
+ * @param {string} method The name of the method to request |
+ * @param {function(*):Buffer} serialize The serialization function for inputs |
+ * @param {function(Buffer)} deserialize The deserialization function for |
+ * outputs |
+ * @return {Function} makeClientStreamRequest |
+ */ |
+function makeClientStreamRequestFunction(method, serialize, deserialize) { |
+ /** |
+ * Make a client stream request with this method on the given channel with the |
+ * given callback, etc. |
+ * @this {Client} Client object. Must have a channel member. |
+ * @param {function(?Error, value=)} callback The callback to for when the |
+ * response is received |
+ * @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
+ * call |
+ * @param {Object=} options Options map |
+ * @return {EventEmitter} An event emitter for stream related events |
+ */ |
+ function makeClientStreamRequest(callback, metadata, options) { |
+ /* jshint validthis: true */ |
+ var call = getCall(this.$channel, method, options); |
+ if (metadata === null || metadata === undefined) { |
+ metadata = new Metadata(); |
+ } else { |
+ metadata = metadata.clone(); |
+ } |
+ var stream = new ClientWritableStream(call, serialize); |
+ var metadata_batch = {}; |
+ metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = |
+ metadata._getCoreRepresentation(); |
+ metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
+ call.startBatch(metadata_batch, function(err, response) { |
+ if (err) { |
+ // The call has stopped for some reason. A non-OK status will arrive |
+ // in the other batch. |
+ return; |
+ } |
+ stream.emit('metadata', Metadata._fromCoreRepresentation( |
+ response.metadata)); |
+ }); |
+ var client_batch = {}; |
+ client_batch[grpc.opType.RECV_MESSAGE] = true; |
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
+ call.startBatch(client_batch, function(err, response) { |
+ response.status.metadata = Metadata._fromCoreRepresentation( |
+ response.status.metadata); |
+ var status = response.status; |
+ var error; |
+ var deserialized; |
+ if (status.code === grpc.status.OK) { |
+ if (err) { |
+ // Got a batch error, but OK status. Something went wrong |
+ callback(err); |
+ return; |
+ } else { |
+ try { |
+ deserialized = deserialize(response.read); |
+ } catch (e) { |
+ /* Change status to indicate bad server response. This will result |
+ * in passing an error to the callback */ |
+ status = { |
+ code: grpc.status.INTERNAL, |
+ details: 'Failed to parse server response' |
+ }; |
+ } |
+ } |
+ } |
+ if (status.code !== grpc.status.OK) { |
+ error = new Error(response.status.details); |
+ error.code = status.code; |
+ error.metadata = status.metadata; |
+ callback(error); |
+ } else { |
+ callback(null, deserialized); |
+ } |
+ stream.emit('status', status); |
+ }); |
+ return stream; |
+ } |
+ return makeClientStreamRequest; |
+} |
+ |
+/** |
+ * Get a function that can make server stream requests to the specified method. |
+ * @param {string} method The name of the method to request |
+ * @param {function(*):Buffer} serialize The serialization function for inputs |
+ * @param {function(Buffer)} deserialize The deserialization function for |
+ * outputs |
+ * @return {Function} makeServerStreamRequest |
+ */ |
+function makeServerStreamRequestFunction(method, serialize, deserialize) { |
+ /** |
+ * Make a server stream request with this method on the given channel with the |
+ * given argument, etc. |
+ * @this {SurfaceClient} Client object. Must have a channel member. |
+ * @param {*} argument The argument to the call. Should be serializable with |
+ * serialize |
+ * @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
+ * call |
+ * @param {Object} options Options map |
+ * @return {EventEmitter} An event emitter for stream related events |
+ */ |
+ function makeServerStreamRequest(argument, metadata, options) { |
+ /* jshint validthis: true */ |
+ var call = getCall(this.$channel, method, options); |
+ if (metadata === null || metadata === undefined) { |
+ metadata = new Metadata(); |
+ } else { |
+ metadata = metadata.clone(); |
+ } |
+ var stream = new ClientReadableStream(call, deserialize); |
+ var start_batch = {}; |
+ var message = serialize(argument); |
+ if (options) { |
+ message.grpcWriteFlags = options.flags; |
+ } |
+ start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
+ metadata._getCoreRepresentation(); |
+ start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
+ start_batch[grpc.opType.SEND_MESSAGE] = message; |
+ start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
+ call.startBatch(start_batch, function(err, response) { |
+ if (err) { |
+ // The call has stopped for some reason. A non-OK status will arrive |
+ // in the other batch. |
+ return; |
+ } |
+ stream.emit('metadata', Metadata._fromCoreRepresentation( |
+ response.metadata)); |
+ }); |
+ var status_batch = {}; |
+ status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
+ call.startBatch(status_batch, function(err, response) { |
+ if (err) { |
+ stream.emit('error', err); |
+ return; |
+ } |
+ response.status.metadata = Metadata._fromCoreRepresentation( |
+ response.status.metadata); |
+ stream._receiveStatus(response.status); |
+ }); |
+ return stream; |
+ } |
+ return makeServerStreamRequest; |
+} |
+ |
+/** |
+ * Get a function that can make bidirectional stream requests to the specified |
+ * method. |
+ * @param {string} method The name of the method to request |
+ * @param {function(*):Buffer} serialize The serialization function for inputs |
+ * @param {function(Buffer)} deserialize The deserialization function for |
+ * outputs |
+ * @return {Function} makeBidiStreamRequest |
+ */ |
+function makeBidiStreamRequestFunction(method, serialize, deserialize) { |
+ /** |
+ * Make a bidirectional stream request with this method on the given channel. |
+ * @this {SurfaceClient} Client object. Must have a channel member. |
+ * @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
+ * call |
+ * @param {Options} options Options map |
+ * @return {EventEmitter} An event emitter for stream related events |
+ */ |
+ function makeBidiStreamRequest(metadata, options) { |
+ /* jshint validthis: true */ |
+ var call = getCall(this.$channel, method, options); |
+ if (metadata === null || metadata === undefined) { |
+ metadata = new Metadata(); |
+ } else { |
+ metadata = metadata.clone(); |
+ } |
+ var stream = new ClientDuplexStream(call, serialize, deserialize); |
+ var start_batch = {}; |
+ start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
+ metadata._getCoreRepresentation(); |
+ start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
+ call.startBatch(start_batch, function(err, response) { |
+ if (err) { |
+ // The call has stopped for some reason. A non-OK status will arrive |
+ // in the other batch. |
+ return; |
+ } |
+ stream.emit('metadata', Metadata._fromCoreRepresentation( |
+ response.metadata)); |
+ }); |
+ var status_batch = {}; |
+ status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
+ call.startBatch(status_batch, function(err, response) { |
+ if (err) { |
+ stream.emit('error', err); |
+ return; |
+ } |
+ response.status.metadata = Metadata._fromCoreRepresentation( |
+ response.status.metadata); |
+ stream._receiveStatus(response.status); |
+ }); |
+ return stream; |
+ } |
+ return makeBidiStreamRequest; |
+} |
+ |
+ |
+/** |
+ * Map with short names for each of the requester maker functions. Used in |
+ * makeClientConstructor |
+ */ |
+var requester_makers = { |
+ unary: makeUnaryRequestFunction, |
+ server_stream: makeServerStreamRequestFunction, |
+ client_stream: makeClientStreamRequestFunction, |
+ bidi: makeBidiStreamRequestFunction |
+}; |
+ |
+/** |
+ * Creates a constructor for a client with the given methods. The methods object |
+ * maps method name to an object with the following keys: |
+ * path: The path on the server for accessing the method. For example, for |
+ * protocol buffers, we use "/service_name/method_name" |
+ * requestStream: bool indicating whether the client sends a stream |
+ * resonseStream: bool indicating whether the server sends a stream |
+ * requestSerialize: function to serialize request objects |
+ * responseDeserialize: function to deserialize response objects |
+ * @param {Object} methods An object mapping method names to method attributes |
+ * @param {string} serviceName The fully qualified name of the service |
+ * @return {function(string, Object)} New client constructor |
+ */ |
+exports.makeClientConstructor = function(methods, serviceName) { |
+ /** |
+ * Create a client with the given methods |
+ * @constructor |
+ * @param {string} address The address of the server to connect to |
+ * @param {grpc.Credentials} credentials Credentials to use to connect |
+ * to the server |
+ * @param {Object} options Options to pass to the underlying channel |
+ */ |
+ function Client(address, credentials, options) { |
+ if (!options) { |
+ options = {}; |
+ } |
+ /* Append the grpc-node user agent string after the application user agent |
+ * string, and put the combination at the beginning of the user agent string |
+ */ |
+ if (options['grpc.primary_user_agent']) { |
+ options['grpc.primary_user_agent'] += ' '; |
+ } else { |
+ options['grpc.primary_user_agent'] = ''; |
+ } |
+ options['grpc.primary_user_agent'] += 'grpc-node/' + version; |
+ /* Private fields use $ as a prefix instead of _ because it is an invalid |
+ * prefix of a method name */ |
+ this.$channel = new grpc.Channel(address, credentials, options); |
+ } |
+ |
+ _.each(methods, function(attrs, name) { |
+ var method_type; |
+ if (_.startsWith(name, '$')) { |
+ throw new Error('Method names cannot start with $'); |
+ } |
+ if (attrs.requestStream) { |
+ if (attrs.responseStream) { |
+ method_type = 'bidi'; |
+ } else { |
+ method_type = 'client_stream'; |
+ } |
+ } else { |
+ if (attrs.responseStream) { |
+ method_type = 'server_stream'; |
+ } else { |
+ method_type = 'unary'; |
+ } |
+ } |
+ var serialize = attrs.requestSerialize; |
+ var deserialize = attrs.responseDeserialize; |
+ Client.prototype[name] = requester_makers[method_type]( |
+ attrs.path, serialize, deserialize); |
+ // Associate all provided attributes with the method |
+ _.assign(Client.prototype[name], attrs); |
+ }); |
+ |
+ return Client; |
+}; |
+ |
+/** |
+ * Return the underlying channel object for the specified client |
+ * @param {Client} client |
+ * @return {Channel} The channel |
+ */ |
+exports.getClientChannel = function(client) { |
+ return client.$channel; |
+}; |
+ |
+/** |
+ * Wait for the client to be ready. The callback will be called when the |
+ * client has successfully connected to the server, and it will be called |
+ * with an error if the attempt to connect to the server has unrecoverablly |
+ * failed or if the deadline expires. This function will make the channel |
+ * start connecting if it has not already done so. |
+ * @param {Client} client The client to wait on |
+ * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass |
+ * Infinity to wait forever. |
+ * @param {function(Error)} callback The callback to call when done attempting |
+ * to connect. |
+ */ |
+exports.waitForClientReady = function(client, deadline, callback) { |
+ var checkState = function(err) { |
+ if (err) { |
+ callback(new Error('Failed to connect before the deadline')); |
+ return; |
+ } |
+ var new_state = client.$channel.getConnectivityState(true); |
+ if (new_state === grpc.connectivityState.READY) { |
+ callback(); |
+ } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { |
+ callback(new Error('Failed to connect to server')); |
+ } else { |
+ client.$channel.watchConnectivityState(new_state, deadline, checkState); |
+ } |
+ }; |
+ checkState(); |
+}; |
+ |
+/** |
+ * Creates a constructor for clients for the given service |
+ * @param {ProtoBuf.Reflect.Service} service The service to generate a client |
+ * for |
+ * @param {Object=} options Options to apply to the client |
+ * @return {function(string, Object)} New client constructor |
+ */ |
+exports.makeProtobufClientConstructor = function(service, options) { |
+ var method_attrs = common.getProtobufServiceAttrs(service, service.name, |
+ options); |
+ var Client = exports.makeClientConstructor( |
+ method_attrs, common.fullyQualifiedName(service)); |
+ Client.service = service; |
+ Client.service.grpc_options = options; |
+ return Client; |
+}; |
+ |
+/** |
+ * Map of status code names to status codes |
+ */ |
+exports.status = grpc.status; |
+ |
+/** |
+ * See docs for client.callError |
+ */ |
+exports.callError = grpc.callError; |