OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 /** |
| 35 * Client module |
| 36 * |
| 37 * This module contains the factory method for creating Client classes, and the |
| 38 * method calling code for all types of methods. |
| 39 * |
| 40 * For example, to create a client and call a method on it: |
| 41 * |
| 42 * var proto_obj = grpc.load(proto_file_path); |
| 43 * var Client = proto_obj.package.subpackage.ServiceName; |
| 44 * var client = new Client(server_address, client_credentials); |
| 45 * var call = client.unaryMethod(arguments, callback); |
| 46 * |
| 47 * @module |
| 48 */ |
| 49 |
| 50 'use strict'; |
| 51 |
| 52 var _ = require('lodash'); |
| 53 |
| 54 var grpc = require('./grpc_extension'); |
| 55 |
| 56 var common = require('./common'); |
| 57 |
| 58 var Metadata = require('./metadata'); |
| 59 |
| 60 var EventEmitter = require('events').EventEmitter; |
| 61 |
| 62 var stream = require('stream'); |
| 63 |
| 64 var Readable = stream.Readable; |
| 65 var Writable = stream.Writable; |
| 66 var Duplex = stream.Duplex; |
| 67 var util = require('util'); |
| 68 var version = require('../../../package.json').version; |
| 69 |
| 70 util.inherits(ClientWritableStream, Writable); |
| 71 |
| 72 /** |
| 73 * A stream that the client can write to. Used for calls that are streaming from |
| 74 * the client side. |
| 75 * @constructor |
| 76 * @param {grpc.Call} call The call object to send data with |
| 77 * @param {function(*):Buffer=} serialize Serialization function for writes. |
| 78 */ |
| 79 function ClientWritableStream(call, serialize) { |
| 80 Writable.call(this, {objectMode: true}); |
| 81 this.call = call; |
| 82 this.serialize = common.wrapIgnoreNull(serialize); |
| 83 this.on('finish', function() { |
| 84 var batch = {}; |
| 85 batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
| 86 call.startBatch(batch, function() {}); |
| 87 }); |
| 88 } |
| 89 |
| 90 /** |
| 91 * Attempt to write the given chunk. Calls the callback when done. This is an |
| 92 * implementation of a method needed for implementing stream.Writable. |
| 93 * @access private |
| 94 * @param {Buffer} chunk The chunk to write |
| 95 * @param {string} encoding Used to pass write flags |
| 96 * @param {function(Error=)} callback Called when the write is complete |
| 97 */ |
| 98 function _write(chunk, encoding, callback) { |
| 99 /* jshint validthis: true */ |
| 100 var batch = {}; |
| 101 var message = this.serialize(chunk); |
| 102 if (_.isFinite(encoding)) { |
| 103 /* Attach the encoding if it is a finite number. This is the closest we |
| 104 * can get to checking that it is valid flags */ |
| 105 message.grpcWriteFlags = encoding; |
| 106 } |
| 107 batch[grpc.opType.SEND_MESSAGE] = message; |
| 108 this.call.startBatch(batch, function(err, event) { |
| 109 if (err) { |
| 110 // Something has gone wrong. Stop writing by failing to call callback |
| 111 return; |
| 112 } |
| 113 callback(); |
| 114 }); |
| 115 } |
| 116 |
| 117 ClientWritableStream.prototype._write = _write; |
| 118 |
| 119 util.inherits(ClientReadableStream, Readable); |
| 120 |
| 121 /** |
| 122 * A stream that the client can read from. Used for calls that are streaming |
| 123 * from the server side. |
| 124 * @constructor |
| 125 * @param {grpc.Call} call The call object to read data with |
| 126 * @param {function(Buffer):*=} deserialize Deserialization function for reads |
| 127 */ |
| 128 function ClientReadableStream(call, deserialize) { |
| 129 Readable.call(this, {objectMode: true}); |
| 130 this.call = call; |
| 131 this.finished = false; |
| 132 this.reading = false; |
| 133 this.deserialize = common.wrapIgnoreNull(deserialize); |
| 134 /* Status generated from reading messages from the server. Overrides the |
| 135 * status from the server if not OK */ |
| 136 this.read_status = null; |
| 137 /* Status received from the server. */ |
| 138 this.received_status = null; |
| 139 } |
| 140 |
| 141 /** |
| 142 * Called when all messages from the server have been processed. The status |
| 143 * parameter indicates that the call should end with that status. status |
| 144 * defaults to OK if not provided. |
| 145 * @param {Object!} status The status that the call should end with |
| 146 */ |
| 147 function _readsDone(status) { |
| 148 /* jshint validthis: true */ |
| 149 if (!status) { |
| 150 status = {code: grpc.status.OK, details: 'OK'}; |
| 151 } |
| 152 this.finished = true; |
| 153 this.read_status = status; |
| 154 this._emitStatusIfDone(); |
| 155 } |
| 156 |
| 157 ClientReadableStream.prototype._readsDone = _readsDone; |
| 158 |
| 159 /** |
| 160 * Called to indicate that we have received a status from the server. |
| 161 */ |
| 162 function _receiveStatus(status) { |
| 163 /* jshint validthis: true */ |
| 164 this.received_status = status; |
| 165 this._emitStatusIfDone(); |
| 166 } |
| 167 |
| 168 ClientReadableStream.prototype._receiveStatus = _receiveStatus; |
| 169 |
| 170 /** |
| 171 * If we have both processed all incoming messages and received the status from |
| 172 * the server, emit the status. Otherwise, do nothing. |
| 173 */ |
| 174 function _emitStatusIfDone() { |
| 175 /* jshint validthis: true */ |
| 176 var status; |
| 177 if (this.read_status && this.received_status) { |
| 178 if (this.read_status.code !== grpc.status.OK) { |
| 179 status = this.read_status; |
| 180 } else { |
| 181 status = this.received_status; |
| 182 } |
| 183 this.emit('status', status); |
| 184 if (status.code !== grpc.status.OK) { |
| 185 var error = new Error(status.details); |
| 186 error.code = status.code; |
| 187 error.metadata = status.metadata; |
| 188 this.emit('error', error); |
| 189 return; |
| 190 } |
| 191 } |
| 192 } |
| 193 |
| 194 ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone; |
| 195 |
| 196 /** |
| 197 * Read the next object from the stream. |
| 198 * @access private |
| 199 * @param {*} size Ignored because we use objectMode=true |
| 200 */ |
| 201 function _read(size) { |
| 202 /* jshint validthis: true */ |
| 203 var self = this; |
| 204 /** |
| 205 * Callback to be called when a READ event is received. Pushes the data onto |
| 206 * the read queue and starts reading again if applicable |
| 207 * @param {grpc.Event} event READ event object |
| 208 */ |
| 209 function readCallback(err, event) { |
| 210 if (err) { |
| 211 // Something has gone wrong. Stop reading and wait for status |
| 212 self.finished = true; |
| 213 self._readsDone(); |
| 214 return; |
| 215 } |
| 216 var data = event.read; |
| 217 var deserialized; |
| 218 try { |
| 219 deserialized = self.deserialize(data); |
| 220 } catch (e) { |
| 221 self._readsDone({code: grpc.status.INTERNAL, |
| 222 details: 'Failed to parse server response'}); |
| 223 } |
| 224 if (data === null) { |
| 225 self._readsDone(); |
| 226 } |
| 227 if (self.push(deserialized) && data !== null) { |
| 228 var read_batch = {}; |
| 229 read_batch[grpc.opType.RECV_MESSAGE] = true; |
| 230 self.call.startBatch(read_batch, readCallback); |
| 231 } else { |
| 232 self.reading = false; |
| 233 } |
| 234 } |
| 235 if (self.finished) { |
| 236 self.push(null); |
| 237 } else { |
| 238 if (!self.reading) { |
| 239 self.reading = true; |
| 240 var read_batch = {}; |
| 241 read_batch[grpc.opType.RECV_MESSAGE] = true; |
| 242 self.call.startBatch(read_batch, readCallback); |
| 243 } |
| 244 } |
| 245 } |
| 246 |
| 247 ClientReadableStream.prototype._read = _read; |
| 248 |
| 249 util.inherits(ClientDuplexStream, Duplex); |
| 250 |
| 251 /** |
| 252 * A stream that the client can read from or write to. Used for calls with |
| 253 * duplex streaming. |
| 254 * @constructor |
| 255 * @param {grpc.Call} call Call object to proxy |
| 256 * @param {function(*):Buffer=} serialize Serialization function for requests |
| 257 * @param {function(Buffer):*=} deserialize Deserialization function for |
| 258 * responses |
| 259 */ |
| 260 function ClientDuplexStream(call, serialize, deserialize) { |
| 261 Duplex.call(this, {objectMode: true}); |
| 262 this.serialize = common.wrapIgnoreNull(serialize); |
| 263 this.deserialize = common.wrapIgnoreNull(deserialize); |
| 264 this.call = call; |
| 265 /* Status generated from reading messages from the server. Overrides the |
| 266 * status from the server if not OK */ |
| 267 this.read_status = null; |
| 268 /* Status received from the server. */ |
| 269 this.received_status = null; |
| 270 this.on('finish', function() { |
| 271 var batch = {}; |
| 272 batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
| 273 call.startBatch(batch, function() {}); |
| 274 }); |
| 275 } |
| 276 |
| 277 ClientDuplexStream.prototype._readsDone = _readsDone; |
| 278 ClientDuplexStream.prototype._receiveStatus = _receiveStatus; |
| 279 ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone; |
| 280 ClientDuplexStream.prototype._read = _read; |
| 281 ClientDuplexStream.prototype._write = _write; |
| 282 |
| 283 /** |
| 284 * Cancel the ongoing call |
| 285 */ |
| 286 function cancel() { |
| 287 /* jshint validthis: true */ |
| 288 this.call.cancel(); |
| 289 } |
| 290 |
| 291 ClientReadableStream.prototype.cancel = cancel; |
| 292 ClientWritableStream.prototype.cancel = cancel; |
| 293 ClientDuplexStream.prototype.cancel = cancel; |
| 294 |
| 295 /** |
| 296 * Get the endpoint this call/stream is connected to. |
| 297 * @return {string} The URI of the endpoint |
| 298 */ |
| 299 function getPeer() { |
| 300 /* jshint validthis: true */ |
| 301 return this.call.getPeer(); |
| 302 } |
| 303 |
| 304 ClientReadableStream.prototype.getPeer = getPeer; |
| 305 ClientWritableStream.prototype.getPeer = getPeer; |
| 306 ClientDuplexStream.prototype.getPeer = getPeer; |
| 307 |
| 308 /** |
| 309 * Get a call object built with the provided options. Keys for options are |
| 310 * 'deadline', which takes a date or number, and 'host', which takes a string |
| 311 * and overrides the hostname to connect to. |
| 312 * @param {Object} options Options map. |
| 313 */ |
| 314 function getCall(channel, method, options) { |
| 315 var deadline; |
| 316 var host; |
| 317 var parent; |
| 318 var propagate_flags; |
| 319 var credentials; |
| 320 if (options) { |
| 321 deadline = options.deadline; |
| 322 host = options.host; |
| 323 parent = _.get(options, 'parent.call'); |
| 324 propagate_flags = options.propagate_flags; |
| 325 credentials = options.credentials; |
| 326 } |
| 327 if (deadline === undefined) { |
| 328 deadline = Infinity; |
| 329 } |
| 330 var call = new grpc.Call(channel, method, deadline, host, |
| 331 parent, propagate_flags); |
| 332 if (credentials) { |
| 333 call.setCredentials(credentials); |
| 334 } |
| 335 return call; |
| 336 } |
| 337 |
| 338 /** |
| 339 * Get a function that can make unary requests to the specified method. |
| 340 * @param {string} method The name of the method to request |
| 341 * @param {function(*):Buffer} serialize The serialization function for inputs |
| 342 * @param {function(Buffer)} deserialize The deserialization function for |
| 343 * outputs |
| 344 * @return {Function} makeUnaryRequest |
| 345 */ |
| 346 function makeUnaryRequestFunction(method, serialize, deserialize) { |
| 347 /** |
| 348 * Make a unary request with this method on the given channel with the given |
| 349 * argument, callback, etc. |
| 350 * @this {Client} Client object. Must have a channel member. |
| 351 * @param {*} argument The argument to the call. Should be serializable with |
| 352 * serialize |
| 353 * @param {function(?Error, value=)} callback The callback to for when the |
| 354 * response is received |
| 355 * @param {Metadata=} metadata Metadata to add to the call |
| 356 * @param {Object=} options Options map |
| 357 * @return {EventEmitter} An event emitter for stream related events |
| 358 */ |
| 359 function makeUnaryRequest(argument, callback, metadata, options) { |
| 360 /* jshint validthis: true */ |
| 361 var emitter = new EventEmitter(); |
| 362 var call = getCall(this.$channel, method, options); |
| 363 if (metadata === null || metadata === undefined) { |
| 364 metadata = new Metadata(); |
| 365 } else { |
| 366 metadata = metadata.clone(); |
| 367 } |
| 368 emitter.cancel = function cancel() { |
| 369 call.cancel(); |
| 370 }; |
| 371 emitter.getPeer = function getPeer() { |
| 372 return call.getPeer(); |
| 373 }; |
| 374 var client_batch = {}; |
| 375 var message = serialize(argument); |
| 376 if (options) { |
| 377 message.grpcWriteFlags = options.flags; |
| 378 } |
| 379 client_batch[grpc.opType.SEND_INITIAL_METADATA] = |
| 380 metadata._getCoreRepresentation(); |
| 381 client_batch[grpc.opType.SEND_MESSAGE] = message; |
| 382 client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
| 383 client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
| 384 client_batch[grpc.opType.RECV_MESSAGE] = true; |
| 385 client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
| 386 call.startBatch(client_batch, function(err, response) { |
| 387 response.status.metadata = Metadata._fromCoreRepresentation( |
| 388 response.status.metadata); |
| 389 var status = response.status; |
| 390 var error; |
| 391 var deserialized; |
| 392 if (status.code === grpc.status.OK) { |
| 393 if (err) { |
| 394 // Got a batch error, but OK status. Something went wrong |
| 395 callback(err); |
| 396 return; |
| 397 } else { |
| 398 try { |
| 399 deserialized = deserialize(response.read); |
| 400 } catch (e) { |
| 401 /* Change status to indicate bad server response. This will result |
| 402 * in passing an error to the callback */ |
| 403 status = { |
| 404 code: grpc.status.INTERNAL, |
| 405 details: 'Failed to parse server response' |
| 406 }; |
| 407 } |
| 408 } |
| 409 } |
| 410 if (status.code !== grpc.status.OK) { |
| 411 error = new Error(status.details); |
| 412 error.code = status.code; |
| 413 error.metadata = status.metadata; |
| 414 callback(error); |
| 415 } else { |
| 416 callback(null, deserialized); |
| 417 } |
| 418 emitter.emit('status', status); |
| 419 emitter.emit('metadata', Metadata._fromCoreRepresentation( |
| 420 response.metadata)); |
| 421 }); |
| 422 return emitter; |
| 423 } |
| 424 return makeUnaryRequest; |
| 425 } |
| 426 |
| 427 /** |
| 428 * Get a function that can make client stream requests to the specified method. |
| 429 * @param {string} method The name of the method to request |
| 430 * @param {function(*):Buffer} serialize The serialization function for inputs |
| 431 * @param {function(Buffer)} deserialize The deserialization function for |
| 432 * outputs |
| 433 * @return {Function} makeClientStreamRequest |
| 434 */ |
| 435 function makeClientStreamRequestFunction(method, serialize, deserialize) { |
| 436 /** |
| 437 * Make a client stream request with this method on the given channel with the |
| 438 * given callback, etc. |
| 439 * @this {Client} Client object. Must have a channel member. |
| 440 * @param {function(?Error, value=)} callback The callback to for when the |
| 441 * response is received |
| 442 * @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
| 443 * call |
| 444 * @param {Object=} options Options map |
| 445 * @return {EventEmitter} An event emitter for stream related events |
| 446 */ |
| 447 function makeClientStreamRequest(callback, metadata, options) { |
| 448 /* jshint validthis: true */ |
| 449 var call = getCall(this.$channel, method, options); |
| 450 if (metadata === null || metadata === undefined) { |
| 451 metadata = new Metadata(); |
| 452 } else { |
| 453 metadata = metadata.clone(); |
| 454 } |
| 455 var stream = new ClientWritableStream(call, serialize); |
| 456 var metadata_batch = {}; |
| 457 metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = |
| 458 metadata._getCoreRepresentation(); |
| 459 metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
| 460 call.startBatch(metadata_batch, function(err, response) { |
| 461 if (err) { |
| 462 // The call has stopped for some reason. A non-OK status will arrive |
| 463 // in the other batch. |
| 464 return; |
| 465 } |
| 466 stream.emit('metadata', Metadata._fromCoreRepresentation( |
| 467 response.metadata)); |
| 468 }); |
| 469 var client_batch = {}; |
| 470 client_batch[grpc.opType.RECV_MESSAGE] = true; |
| 471 client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
| 472 call.startBatch(client_batch, function(err, response) { |
| 473 response.status.metadata = Metadata._fromCoreRepresentation( |
| 474 response.status.metadata); |
| 475 var status = response.status; |
| 476 var error; |
| 477 var deserialized; |
| 478 if (status.code === grpc.status.OK) { |
| 479 if (err) { |
| 480 // Got a batch error, but OK status. Something went wrong |
| 481 callback(err); |
| 482 return; |
| 483 } else { |
| 484 try { |
| 485 deserialized = deserialize(response.read); |
| 486 } catch (e) { |
| 487 /* Change status to indicate bad server response. This will result |
| 488 * in passing an error to the callback */ |
| 489 status = { |
| 490 code: grpc.status.INTERNAL, |
| 491 details: 'Failed to parse server response' |
| 492 }; |
| 493 } |
| 494 } |
| 495 } |
| 496 if (status.code !== grpc.status.OK) { |
| 497 error = new Error(response.status.details); |
| 498 error.code = status.code; |
| 499 error.metadata = status.metadata; |
| 500 callback(error); |
| 501 } else { |
| 502 callback(null, deserialized); |
| 503 } |
| 504 stream.emit('status', status); |
| 505 }); |
| 506 return stream; |
| 507 } |
| 508 return makeClientStreamRequest; |
| 509 } |
| 510 |
| 511 /** |
| 512 * Get a function that can make server stream requests to the specified method. |
| 513 * @param {string} method The name of the method to request |
| 514 * @param {function(*):Buffer} serialize The serialization function for inputs |
| 515 * @param {function(Buffer)} deserialize The deserialization function for |
| 516 * outputs |
| 517 * @return {Function} makeServerStreamRequest |
| 518 */ |
| 519 function makeServerStreamRequestFunction(method, serialize, deserialize) { |
| 520 /** |
| 521 * Make a server stream request with this method on the given channel with the |
| 522 * given argument, etc. |
| 523 * @this {SurfaceClient} Client object. Must have a channel member. |
| 524 * @param {*} argument The argument to the call. Should be serializable with |
| 525 * serialize |
| 526 * @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
| 527 * call |
| 528 * @param {Object} options Options map |
| 529 * @return {EventEmitter} An event emitter for stream related events |
| 530 */ |
| 531 function makeServerStreamRequest(argument, metadata, options) { |
| 532 /* jshint validthis: true */ |
| 533 var call = getCall(this.$channel, method, options); |
| 534 if (metadata === null || metadata === undefined) { |
| 535 metadata = new Metadata(); |
| 536 } else { |
| 537 metadata = metadata.clone(); |
| 538 } |
| 539 var stream = new ClientReadableStream(call, deserialize); |
| 540 var start_batch = {}; |
| 541 var message = serialize(argument); |
| 542 if (options) { |
| 543 message.grpcWriteFlags = options.flags; |
| 544 } |
| 545 start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
| 546 metadata._getCoreRepresentation(); |
| 547 start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
| 548 start_batch[grpc.opType.SEND_MESSAGE] = message; |
| 549 start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; |
| 550 call.startBatch(start_batch, function(err, response) { |
| 551 if (err) { |
| 552 // The call has stopped for some reason. A non-OK status will arrive |
| 553 // in the other batch. |
| 554 return; |
| 555 } |
| 556 stream.emit('metadata', Metadata._fromCoreRepresentation( |
| 557 response.metadata)); |
| 558 }); |
| 559 var status_batch = {}; |
| 560 status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
| 561 call.startBatch(status_batch, function(err, response) { |
| 562 if (err) { |
| 563 stream.emit('error', err); |
| 564 return; |
| 565 } |
| 566 response.status.metadata = Metadata._fromCoreRepresentation( |
| 567 response.status.metadata); |
| 568 stream._receiveStatus(response.status); |
| 569 }); |
| 570 return stream; |
| 571 } |
| 572 return makeServerStreamRequest; |
| 573 } |
| 574 |
| 575 /** |
| 576 * Get a function that can make bidirectional stream requests to the specified |
| 577 * method. |
| 578 * @param {string} method The name of the method to request |
| 579 * @param {function(*):Buffer} serialize The serialization function for inputs |
| 580 * @param {function(Buffer)} deserialize The deserialization function for |
| 581 * outputs |
| 582 * @return {Function} makeBidiStreamRequest |
| 583 */ |
| 584 function makeBidiStreamRequestFunction(method, serialize, deserialize) { |
| 585 /** |
| 586 * Make a bidirectional stream request with this method on the given channel. |
| 587 * @this {SurfaceClient} Client object. Must have a channel member. |
| 588 * @param {Metadata=} metadata Array of metadata key/value pairs to add to the |
| 589 * call |
| 590 * @param {Options} options Options map |
| 591 * @return {EventEmitter} An event emitter for stream related events |
| 592 */ |
| 593 function makeBidiStreamRequest(metadata, options) { |
| 594 /* jshint validthis: true */ |
| 595 var call = getCall(this.$channel, method, options); |
| 596 if (metadata === null || metadata === undefined) { |
| 597 metadata = new Metadata(); |
| 598 } else { |
| 599 metadata = metadata.clone(); |
| 600 } |
| 601 var stream = new ClientDuplexStream(call, serialize, deserialize); |
| 602 var start_batch = {}; |
| 603 start_batch[grpc.opType.SEND_INITIAL_METADATA] = |
| 604 metadata._getCoreRepresentation(); |
| 605 start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; |
| 606 call.startBatch(start_batch, function(err, response) { |
| 607 if (err) { |
| 608 // The call has stopped for some reason. A non-OK status will arrive |
| 609 // in the other batch. |
| 610 return; |
| 611 } |
| 612 stream.emit('metadata', Metadata._fromCoreRepresentation( |
| 613 response.metadata)); |
| 614 }); |
| 615 var status_batch = {}; |
| 616 status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; |
| 617 call.startBatch(status_batch, function(err, response) { |
| 618 if (err) { |
| 619 stream.emit('error', err); |
| 620 return; |
| 621 } |
| 622 response.status.metadata = Metadata._fromCoreRepresentation( |
| 623 response.status.metadata); |
| 624 stream._receiveStatus(response.status); |
| 625 }); |
| 626 return stream; |
| 627 } |
| 628 return makeBidiStreamRequest; |
| 629 } |
| 630 |
| 631 |
| 632 /** |
| 633 * Map with short names for each of the requester maker functions. Used in |
| 634 * makeClientConstructor |
| 635 */ |
| 636 var requester_makers = { |
| 637 unary: makeUnaryRequestFunction, |
| 638 server_stream: makeServerStreamRequestFunction, |
| 639 client_stream: makeClientStreamRequestFunction, |
| 640 bidi: makeBidiStreamRequestFunction |
| 641 }; |
| 642 |
| 643 /** |
| 644 * Creates a constructor for a client with the given methods. The methods object |
| 645 * maps method name to an object with the following keys: |
| 646 * path: The path on the server for accessing the method. For example, for |
| 647 * protocol buffers, we use "/service_name/method_name" |
| 648 * requestStream: bool indicating whether the client sends a stream |
| 649 * resonseStream: bool indicating whether the server sends a stream |
| 650 * requestSerialize: function to serialize request objects |
| 651 * responseDeserialize: function to deserialize response objects |
| 652 * @param {Object} methods An object mapping method names to method attributes |
| 653 * @param {string} serviceName The fully qualified name of the service |
| 654 * @return {function(string, Object)} New client constructor |
| 655 */ |
| 656 exports.makeClientConstructor = function(methods, serviceName) { |
| 657 /** |
| 658 * Create a client with the given methods |
| 659 * @constructor |
| 660 * @param {string} address The address of the server to connect to |
| 661 * @param {grpc.Credentials} credentials Credentials to use to connect |
| 662 * to the server |
| 663 * @param {Object} options Options to pass to the underlying channel |
| 664 */ |
| 665 function Client(address, credentials, options) { |
| 666 if (!options) { |
| 667 options = {}; |
| 668 } |
| 669 /* Append the grpc-node user agent string after the application user agent |
| 670 * string, and put the combination at the beginning of the user agent string |
| 671 */ |
| 672 if (options['grpc.primary_user_agent']) { |
| 673 options['grpc.primary_user_agent'] += ' '; |
| 674 } else { |
| 675 options['grpc.primary_user_agent'] = ''; |
| 676 } |
| 677 options['grpc.primary_user_agent'] += 'grpc-node/' + version; |
| 678 /* Private fields use $ as a prefix instead of _ because it is an invalid |
| 679 * prefix of a method name */ |
| 680 this.$channel = new grpc.Channel(address, credentials, options); |
| 681 } |
| 682 |
| 683 _.each(methods, function(attrs, name) { |
| 684 var method_type; |
| 685 if (_.startsWith(name, '$')) { |
| 686 throw new Error('Method names cannot start with $'); |
| 687 } |
| 688 if (attrs.requestStream) { |
| 689 if (attrs.responseStream) { |
| 690 method_type = 'bidi'; |
| 691 } else { |
| 692 method_type = 'client_stream'; |
| 693 } |
| 694 } else { |
| 695 if (attrs.responseStream) { |
| 696 method_type = 'server_stream'; |
| 697 } else { |
| 698 method_type = 'unary'; |
| 699 } |
| 700 } |
| 701 var serialize = attrs.requestSerialize; |
| 702 var deserialize = attrs.responseDeserialize; |
| 703 Client.prototype[name] = requester_makers[method_type]( |
| 704 attrs.path, serialize, deserialize); |
| 705 // Associate all provided attributes with the method |
| 706 _.assign(Client.prototype[name], attrs); |
| 707 }); |
| 708 |
| 709 return Client; |
| 710 }; |
| 711 |
| 712 /** |
| 713 * Return the underlying channel object for the specified client |
| 714 * @param {Client} client |
| 715 * @return {Channel} The channel |
| 716 */ |
| 717 exports.getClientChannel = function(client) { |
| 718 return client.$channel; |
| 719 }; |
| 720 |
| 721 /** |
| 722 * Wait for the client to be ready. The callback will be called when the |
| 723 * client has successfully connected to the server, and it will be called |
| 724 * with an error if the attempt to connect to the server has unrecoverablly |
| 725 * failed or if the deadline expires. This function will make the channel |
| 726 * start connecting if it has not already done so. |
| 727 * @param {Client} client The client to wait on |
| 728 * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass |
| 729 * Infinity to wait forever. |
| 730 * @param {function(Error)} callback The callback to call when done attempting |
| 731 * to connect. |
| 732 */ |
| 733 exports.waitForClientReady = function(client, deadline, callback) { |
| 734 var checkState = function(err) { |
| 735 if (err) { |
| 736 callback(new Error('Failed to connect before the deadline')); |
| 737 return; |
| 738 } |
| 739 var new_state = client.$channel.getConnectivityState(true); |
| 740 if (new_state === grpc.connectivityState.READY) { |
| 741 callback(); |
| 742 } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { |
| 743 callback(new Error('Failed to connect to server')); |
| 744 } else { |
| 745 client.$channel.watchConnectivityState(new_state, deadline, checkState); |
| 746 } |
| 747 }; |
| 748 checkState(); |
| 749 }; |
| 750 |
| 751 /** |
| 752 * Creates a constructor for clients for the given service |
| 753 * @param {ProtoBuf.Reflect.Service} service The service to generate a client |
| 754 * for |
| 755 * @param {Object=} options Options to apply to the client |
| 756 * @return {function(string, Object)} New client constructor |
| 757 */ |
| 758 exports.makeProtobufClientConstructor = function(service, options) { |
| 759 var method_attrs = common.getProtobufServiceAttrs(service, service.name, |
| 760 options); |
| 761 var Client = exports.makeClientConstructor( |
| 762 method_attrs, common.fullyQualifiedName(service)); |
| 763 Client.service = service; |
| 764 Client.service.grpc_options = options; |
| 765 return Client; |
| 766 }; |
| 767 |
| 768 /** |
| 769 * Map of status code names to status codes |
| 770 */ |
| 771 exports.status = grpc.status; |
| 772 |
| 773 /** |
| 774 * See docs for client.callError |
| 775 */ |
| 776 exports.callError = grpc.callError; |
OLD | NEW |