Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: third_party/grpc/src/node/interop/interop_server.js

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 'use strict';
35
36 var fs = require('fs');
37 var path = require('path');
38 var _ = require('lodash');
39 var AsyncDelayQueue = require('./async_delay_queue');
40 var grpc = require('..');
41 var testProto = grpc.load({
42 root: __dirname + '/../../..',
43 file: 'src/proto/grpc/testing/test.proto'}).grpc.testing;
44
45 var ECHO_INITIAL_KEY = 'x-grpc-test-echo-initial';
46 var ECHO_TRAILING_KEY = 'x-grpc-test-echo-trailing-bin';
47
48 var incompressible_data = fs.readFileSync(
49 __dirname + '/../../../test/cpp/interop/rnd.dat');
50
51 /**
52 * Create a buffer filled with size zeroes
53 * @param {number} size The length of the buffer
54 * @return {Buffer} The new buffer
55 */
56 function zeroBuffer(size) {
57 var zeros = new Buffer(size);
58 zeros.fill(0);
59 return zeros;
60 }
61
62 /**
63 * Echos a header metadata item as specified in the interop spec.
64 * @param {Call} call The call to echo metadata on
65 */
66 function echoHeader(call) {
67 var echo_initial = call.metadata.get(ECHO_INITIAL_KEY);
68 if (echo_initial.length > 0) {
69 var response_metadata = new grpc.Metadata();
70 response_metadata.set(ECHO_INITIAL_KEY, echo_initial[0]);
71 call.sendMetadata(response_metadata);
72 }
73 }
74
75 /**
76 * Gets the trailer metadata that should be echoed when the call is done,
77 * as specified in the interop spec.
78 * @param {Call} call The call to get metadata from
79 * @return {grpc.Metadata} The metadata to send as a trailer
80 */
81 function getEchoTrailer(call) {
82 var echo_trailer = call.metadata.get(ECHO_TRAILING_KEY);
83 var response_trailer = new grpc.Metadata();
84 if (echo_trailer.length > 0) {
85 response_trailer.set(ECHO_TRAILING_KEY, echo_trailer[0]);
86 }
87 return response_trailer;
88 }
89
90 function getPayload(payload_type, size) {
91 if (payload_type === 'RANDOM') {
92 payload_type = ['COMPRESSABLE',
93 'UNCOMPRESSABLE'][Math.random() < 0.5 ? 0 : 1];
94 }
95 var body;
96 switch (payload_type) {
97 case 'COMPRESSABLE': body = zeroBuffer(size); break;
98 case 'UNCOMPRESSABLE': incompressible_data.slice(size); break;
99 }
100 return {type: payload_type, body: body};
101 }
102
103 /**
104 * Respond to an empty parameter with an empty response.
105 * NOTE: this currently does not work due to issue #137
106 * @param {Call} call Call to handle
107 * @param {function(Error, Object)} callback Callback to call with result
108 * or error
109 */
110 function handleEmpty(call, callback) {
111 echoHeader(call);
112 callback(null, {}, getEchoTrailer(call));
113 }
114
115 /**
116 * Handle a unary request by sending the requested payload
117 * @param {Call} call Call to handle
118 * @param {function(Error, Object)} callback Callback to call with result or
119 * error
120 */
121 function handleUnary(call, callback) {
122 echoHeader(call);
123 var req = call.request;
124 if (req.response_status) {
125 var status = req.response_status;
126 status.metadata = getEchoTrailer(call);
127 callback(status);
128 return;
129 }
130 var payload = getPayload(req.response_type, req.response_size);
131 callback(null, {payload: payload},
132 getEchoTrailer(call));
133 }
134
135 /**
136 * Respond to a streaming call with the total size of all payloads
137 * @param {Call} call Call to handle
138 * @param {function(Error, Object)} callback Callback to call with result or
139 * error
140 */
141 function handleStreamingInput(call, callback) {
142 echoHeader(call);
143 var aggregate_size = 0;
144 call.on('data', function(value) {
145 aggregate_size += value.payload.body.length;
146 });
147 call.on('end', function() {
148 callback(null, {aggregated_payload_size: aggregate_size},
149 getEchoTrailer(call));
150 });
151 }
152
153 /**
154 * Respond to a payload request with a stream of the requested payloads
155 * @param {Call} call Call to handle
156 */
157 function handleStreamingOutput(call) {
158 echoHeader(call);
159 var delay_queue = new AsyncDelayQueue();
160 var req = call.request;
161 if (req.response_status) {
162 var status = req.response_status;
163 status.metadata = getEchoTrailer(call);
164 call.emit('error', status);
165 return;
166 }
167 _.each(req.response_parameters, function(resp_param) {
168 delay_queue.add(function(next) {
169 call.write({payload: getPayload(req.response_type, resp_param.size)});
170 next();
171 }, resp_param.interval_us);
172 });
173 delay_queue.add(function(next) {
174 call.end(getEchoTrailer(call));
175 next();
176 });
177 }
178
179 /**
180 * Respond to a stream of payload requests with a stream of payload responses as
181 * they arrive.
182 * @param {Call} call Call to handle
183 */
184 function handleFullDuplex(call) {
185 echoHeader(call);
186 var delay_queue = new AsyncDelayQueue();
187 call.on('data', function(value) {
188 if (value.response_status) {
189 var status = value.response_status;
190 status.metadata = getEchoTrailer(call);
191 call.emit('error', status);
192 return;
193 }
194 _.each(value.response_parameters, function(resp_param) {
195 delay_queue.add(function(next) {
196 call.write({payload: getPayload(value.response_type, resp_param.size)});
197 next();
198 }, resp_param.interval_us);
199 });
200 });
201 call.on('end', function() {
202 delay_queue.add(function(next) {
203 call.end(getEchoTrailer(call));
204 next();
205 });
206 });
207 }
208
209 /**
210 * Respond to a stream of payload requests with a stream of payload responses
211 * after all requests have arrived
212 * @param {Call} call Call to handle
213 */
214 function handleHalfDuplex(call) {
215 call.emit('error', Error('HalfDuplexCall not yet implemented'));
216 }
217
218 /**
219 * Get a server object bound to the given port
220 * @param {string} port Port to which to bind
221 * @param {boolean} tls Indicates that the bound port should use TLS
222 * @return {{server: Server, port: number}} Server object bound to the support,
223 * and port number that the server is bound to
224 */
225 function getServer(port, tls) {
226 // TODO(mlumish): enable TLS functionality
227 var options = {};
228 var server_creds;
229 if (tls) {
230 var key_path = path.join(__dirname, '../test/data/server1.key');
231 var pem_path = path.join(__dirname, '../test/data/server1.pem');
232
233 var key_data = fs.readFileSync(key_path);
234 var pem_data = fs.readFileSync(pem_path);
235 server_creds = grpc.ServerCredentials.createSsl(null,
236 [{private_key: key_data,
237 cert_chain: pem_data}]);
238 } else {
239 server_creds = grpc.ServerCredentials.createInsecure();
240 }
241 var server = new grpc.Server(options);
242 server.addProtoService(testProto.TestService.service, {
243 emptyCall: handleEmpty,
244 unaryCall: handleUnary,
245 streamingOutputCall: handleStreamingOutput,
246 streamingInputCall: handleStreamingInput,
247 fullDuplexCall: handleFullDuplex,
248 halfDuplexCall: handleHalfDuplex
249 });
250 var port_num = server.bind('0.0.0.0:' + port, server_creds);
251 return {server: server, port: port_num};
252 }
253
254 if (require.main === module) {
255 var parseArgs = require('minimist');
256 var argv = parseArgs(process.argv, {
257 string: ['port', 'use_tls']
258 });
259 var server_obj = getServer(argv.port, argv.use_tls === 'true');
260 console.log('Server attaching to port ' + argv.port);
261 server_obj.server.start();
262 }
263
264 /**
265 * See docs for getServer
266 */
267 exports.getServer = getServer;
OLDNEW
« no previous file with comments | « third_party/grpc/src/node/interop/interop_client.js ('k') | third_party/grpc/src/node/jsdoc_conf.json » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698