OLD | NEW |
(Empty) | |
| 1 # Copyright 2015-2016, Google Inc. |
| 2 # All rights reserved. |
| 3 # |
| 4 # Redistribution and use in source and binary forms, with or without |
| 5 # modification, are permitted provided that the following conditions are |
| 6 # met: |
| 7 # |
| 8 # * Redistributions of source code must retain the above copyright |
| 9 # notice, this list of conditions and the following disclaimer. |
| 10 # * Redistributions in binary form must reproduce the above |
| 11 # copyright notice, this list of conditions and the following disclaimer |
| 12 # in the documentation and/or other materials provided with the |
| 13 # distribution. |
| 14 # * Neither the name of Google Inc. nor the names of its |
| 15 # contributors may be used to endorse or promote products derived from |
| 16 # this software without specific prior written permission. |
| 17 # |
| 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 29 |
| 30 require 'grpc' |
| 31 |
| 32 def load_test_certs |
| 33 test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') |
| 34 files = ['ca.pem', 'server1.key', 'server1.pem'] |
| 35 files.map { |f| File.open(File.join(test_root, f)).read } |
| 36 end |
| 37 |
| 38 def check_md(wanted_md, received_md) |
| 39 wanted_md.zip(received_md).each do |w, r| |
| 40 w.each do |key, value| |
| 41 expect(r[key]).to eq(value) |
| 42 end |
| 43 end |
| 44 end |
| 45 |
| 46 # A test message |
| 47 class EchoMsg |
| 48 def self.marshal(_o) |
| 49 '' |
| 50 end |
| 51 |
| 52 def self.unmarshal(_o) |
| 53 EchoMsg.new |
| 54 end |
| 55 end |
| 56 |
| 57 # A test service with no methods. |
| 58 class EmptyService |
| 59 include GRPC::GenericService |
| 60 end |
| 61 |
| 62 # A test service without an implementation. |
| 63 class NoRpcImplementation |
| 64 include GRPC::GenericService |
| 65 rpc :an_rpc, EchoMsg, EchoMsg |
| 66 end |
| 67 |
| 68 # A test service with an echo implementation. |
| 69 class EchoService |
| 70 include GRPC::GenericService |
| 71 rpc :an_rpc, EchoMsg, EchoMsg |
| 72 attr_reader :received_md |
| 73 |
| 74 def initialize(**kw) |
| 75 @trailing_metadata = kw |
| 76 @received_md = [] |
| 77 end |
| 78 |
| 79 def an_rpc(req, call) |
| 80 GRPC.logger.info('echo service received a request') |
| 81 call.output_metadata.update(@trailing_metadata) |
| 82 @received_md << call.metadata unless call.metadata.nil? |
| 83 req |
| 84 end |
| 85 end |
| 86 |
| 87 EchoStub = EchoService.rpc_stub_class |
| 88 |
| 89 # A test service with an implementation that fails with BadStatus |
| 90 class FailingService |
| 91 include GRPC::GenericService |
| 92 rpc :an_rpc, EchoMsg, EchoMsg |
| 93 attr_reader :details, :code, :md |
| 94 |
| 95 def initialize(_default_var = 'ignored') |
| 96 @details = 'app error' |
| 97 @code = 101 |
| 98 @md = { failed_method: 'an_rpc' } |
| 99 end |
| 100 |
| 101 def an_rpc(_req, _call) |
| 102 fail GRPC::BadStatus.new(@code, @details, **@md) |
| 103 end |
| 104 end |
| 105 |
| 106 FailingStub = FailingService.rpc_stub_class |
| 107 |
| 108 # A slow test service. |
| 109 class SlowService |
| 110 include GRPC::GenericService |
| 111 rpc :an_rpc, EchoMsg, EchoMsg |
| 112 attr_reader :received_md, :delay |
| 113 |
| 114 def initialize(_default_var = 'ignored') |
| 115 @delay = 0.25 |
| 116 @received_md = [] |
| 117 end |
| 118 |
| 119 def an_rpc(req, call) |
| 120 GRPC.logger.info("starting a slow #{@delay} rpc") |
| 121 sleep @delay |
| 122 @received_md << call.metadata unless call.metadata.nil? |
| 123 req # send back the req as the response |
| 124 end |
| 125 end |
| 126 |
| 127 SlowStub = SlowService.rpc_stub_class |
| 128 |
| 129 describe GRPC::RpcServer do |
| 130 RpcServer = GRPC::RpcServer |
| 131 StatusCodes = GRPC::Core::StatusCodes |
| 132 |
| 133 before(:each) do |
| 134 @method = 'an_rpc_method' |
| 135 @pass = 0 |
| 136 @fail = 1 |
| 137 @noop = proc { |x| x } |
| 138 |
| 139 @server_queue = GRPC::Core::CompletionQueue.new |
| 140 server_host = '0.0.0.0:0' |
| 141 @server = GRPC::Core::Server.new(@server_queue, nil) |
| 142 server_port = @server.add_http2_port(server_host, :this_port_is_insecure) |
| 143 @host = "localhost:#{server_port}" |
| 144 @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure) |
| 145 end |
| 146 |
| 147 describe '#new' do |
| 148 it 'can be created with just some args' do |
| 149 opts = { a_channel_arg: 'an_arg' } |
| 150 blk = proc do |
| 151 RpcServer.new(**opts) |
| 152 end |
| 153 expect(&blk).not_to raise_error |
| 154 end |
| 155 |
| 156 it 'can be created with a default deadline' do |
| 157 opts = { a_channel_arg: 'an_arg', deadline: 5 } |
| 158 blk = proc do |
| 159 RpcServer.new(**opts) |
| 160 end |
| 161 expect(&blk).not_to raise_error |
| 162 end |
| 163 |
| 164 it 'can be created with a completion queue override' do |
| 165 opts = { |
| 166 a_channel_arg: 'an_arg', |
| 167 completion_queue_override: @server_queue |
| 168 } |
| 169 blk = proc do |
| 170 RpcServer.new(**opts) |
| 171 end |
| 172 expect(&blk).not_to raise_error |
| 173 end |
| 174 |
| 175 it 'cannot be created with a bad completion queue override' do |
| 176 blk = proc do |
| 177 opts = { |
| 178 a_channel_arg: 'an_arg', |
| 179 completion_queue_override: Object.new |
| 180 } |
| 181 RpcServer.new(**opts) |
| 182 end |
| 183 expect(&blk).to raise_error |
| 184 end |
| 185 |
| 186 it 'cannot be created with invalid ServerCredentials' do |
| 187 blk = proc do |
| 188 opts = { |
| 189 a_channel_arg: 'an_arg', |
| 190 creds: Object.new |
| 191 } |
| 192 RpcServer.new(**opts) |
| 193 end |
| 194 expect(&blk).to raise_error |
| 195 end |
| 196 |
| 197 it 'can be created with a server override' do |
| 198 opts = { a_channel_arg: 'an_arg', server_override: @server } |
| 199 blk = proc do |
| 200 RpcServer.new(**opts) |
| 201 end |
| 202 expect(&blk).not_to raise_error |
| 203 end |
| 204 |
| 205 it 'cannot be created with a bad server override' do |
| 206 blk = proc do |
| 207 opts = { |
| 208 a_channel_arg: 'an_arg', |
| 209 server_override: Object.new |
| 210 } |
| 211 RpcServer.new(**opts) |
| 212 end |
| 213 expect(&blk).to raise_error |
| 214 end |
| 215 end |
| 216 |
| 217 describe '#stopped?' do |
| 218 before(:each) do |
| 219 opts = { a_channel_arg: 'an_arg', poll_period: 1.5 } |
| 220 @srv = RpcServer.new(**opts) |
| 221 end |
| 222 |
| 223 it 'starts out false' do |
| 224 expect(@srv.stopped?).to be(false) |
| 225 end |
| 226 |
| 227 it 'stays false after the server starts running', server: true do |
| 228 @srv.handle(EchoService) |
| 229 t = Thread.new { @srv.run } |
| 230 @srv.wait_till_running |
| 231 expect(@srv.stopped?).to be(false) |
| 232 @srv.stop |
| 233 t.join |
| 234 end |
| 235 |
| 236 it 'is true after a running server is stopped', server: true do |
| 237 @srv.handle(EchoService) |
| 238 t = Thread.new { @srv.run } |
| 239 @srv.wait_till_running |
| 240 @srv.stop |
| 241 t.join |
| 242 expect(@srv.stopped?).to be(true) |
| 243 end |
| 244 end |
| 245 |
| 246 describe '#running?' do |
| 247 it 'starts out false' do |
| 248 opts = { a_channel_arg: 'an_arg', server_override: @server } |
| 249 r = RpcServer.new(**opts) |
| 250 expect(r.running?).to be(false) |
| 251 end |
| 252 |
| 253 it 'is false if run is called with no services registered', server: true do |
| 254 opts = { |
| 255 a_channel_arg: 'an_arg', |
| 256 poll_period: 2, |
| 257 server_override: @server |
| 258 } |
| 259 r = RpcServer.new(**opts) |
| 260 expect { r.run }.to raise_error(RuntimeError) |
| 261 end |
| 262 |
| 263 it 'is true after run is called with a registered service' do |
| 264 opts = { |
| 265 a_channel_arg: 'an_arg', |
| 266 poll_period: 2.5, |
| 267 server_override: @server |
| 268 } |
| 269 r = RpcServer.new(**opts) |
| 270 r.handle(EchoService) |
| 271 t = Thread.new { r.run } |
| 272 r.wait_till_running |
| 273 expect(r.running?).to be(true) |
| 274 r.stop |
| 275 t.join |
| 276 end |
| 277 end |
| 278 |
| 279 describe '#handle' do |
| 280 before(:each) do |
| 281 @opts = { a_channel_arg: 'an_arg', poll_period: 1 } |
| 282 @srv = RpcServer.new(**@opts) |
| 283 end |
| 284 |
| 285 it 'raises if #run has already been called' do |
| 286 @srv.handle(EchoService) |
| 287 t = Thread.new { @srv.run } |
| 288 @srv.wait_till_running |
| 289 expect { @srv.handle(EchoService) }.to raise_error |
| 290 @srv.stop |
| 291 t.join |
| 292 end |
| 293 |
| 294 it 'raises if the server has been run and stopped' do |
| 295 @srv.handle(EchoService) |
| 296 t = Thread.new { @srv.run } |
| 297 @srv.wait_till_running |
| 298 @srv.stop |
| 299 t.join |
| 300 expect { @srv.handle(EchoService) }.to raise_error |
| 301 end |
| 302 |
| 303 it 'raises if the service does not include GenericService ' do |
| 304 expect { @srv.handle(Object) }.to raise_error |
| 305 end |
| 306 |
| 307 it 'raises if the service does not declare any rpc methods' do |
| 308 expect { @srv.handle(EmptyService) }.to raise_error |
| 309 end |
| 310 |
| 311 it 'raises if the service does not define its rpc methods' do |
| 312 expect { @srv.handle(NoRpcImplementation) }.to raise_error |
| 313 end |
| 314 |
| 315 it 'raises if a handler method is already registered' do |
| 316 @srv.handle(EchoService) |
| 317 expect { r.handle(EchoService) }.to raise_error |
| 318 end |
| 319 end |
| 320 |
| 321 describe '#run' do |
| 322 let(:client_opts) { { channel_override: @ch } } |
| 323 let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc } |
| 324 let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) } |
| 325 |
| 326 context 'with no connect_metadata' do |
| 327 before(:each) do |
| 328 server_opts = { |
| 329 server_override: @server, |
| 330 completion_queue_override: @server_queue, |
| 331 poll_period: 1 |
| 332 } |
| 333 @srv = RpcServer.new(**server_opts) |
| 334 end |
| 335 |
| 336 it 'should return NOT_FOUND status on unknown methods', server: true do |
| 337 @srv.handle(EchoService) |
| 338 t = Thread.new { @srv.run } |
| 339 @srv.wait_till_running |
| 340 req = EchoMsg.new |
| 341 blk = proc do |
| 342 cq = GRPC::Core::CompletionQueue.new |
| 343 stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure, |
| 344 **client_opts) |
| 345 stub.request_response('/unknown', req, marshal, unmarshal) |
| 346 end |
| 347 expect(&blk).to raise_error GRPC::BadStatus |
| 348 @srv.stop |
| 349 t.join |
| 350 end |
| 351 |
| 352 it 'should handle multiple sequential requests', server: true do |
| 353 @srv.handle(EchoService) |
| 354 t = Thread.new { @srv.run } |
| 355 @srv.wait_till_running |
| 356 req = EchoMsg.new |
| 357 n = 5 # arbitrary |
| 358 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 359 n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } |
| 360 @srv.stop |
| 361 t.join |
| 362 end |
| 363 |
| 364 it 'should receive metadata sent as rpc keyword args', server: true do |
| 365 service = EchoService.new |
| 366 @srv.handle(service) |
| 367 t = Thread.new { @srv.run } |
| 368 @srv.wait_till_running |
| 369 req = EchoMsg.new |
| 370 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 371 expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) |
| 372 wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] |
| 373 check_md(wanted_md, service.received_md) |
| 374 @srv.stop |
| 375 t.join |
| 376 end |
| 377 |
| 378 it 'should receive metadata if a deadline is specified', server: true do |
| 379 service = SlowService.new |
| 380 @srv.handle(service) |
| 381 t = Thread.new { @srv.run } |
| 382 @srv.wait_till_running |
| 383 req = EchoMsg.new |
| 384 stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 385 timeout = service.delay + 1.0 # wait for long enough |
| 386 resp = stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') |
| 387 expect(resp).to be_a(EchoMsg) |
| 388 wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] |
| 389 check_md(wanted_md, service.received_md) |
| 390 @srv.stop |
| 391 t.join |
| 392 end |
| 393 |
| 394 it 'should handle cancellation correctly', server: true do |
| 395 service = SlowService.new |
| 396 @srv.handle(service) |
| 397 t = Thread.new { @srv.run } |
| 398 @srv.wait_till_running |
| 399 req = EchoMsg.new |
| 400 stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 401 op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) |
| 402 Thread.new do # cancel the call |
| 403 sleep 0.1 |
| 404 op.cancel |
| 405 end |
| 406 expect { op.execute }.to raise_error GRPC::Cancelled |
| 407 @srv.stop |
| 408 t.join |
| 409 end |
| 410 |
| 411 it 'should handle multiple parallel requests', server: true do |
| 412 @srv.handle(EchoService) |
| 413 t = Thread.new { @srv.run } |
| 414 @srv.wait_till_running |
| 415 req, q = EchoMsg.new, Queue.new |
| 416 n = 5 # arbitrary |
| 417 threads = [t] |
| 418 n.times do |
| 419 threads << Thread.new do |
| 420 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 421 q << stub.an_rpc(req) |
| 422 end |
| 423 end |
| 424 n.times { expect(q.pop).to be_a(EchoMsg) } |
| 425 @srv.stop |
| 426 threads.each(&:join) |
| 427 end |
| 428 |
| 429 it 'should return UNAVAILABLE on too many jobs', server: true do |
| 430 opts = { |
| 431 a_channel_arg: 'an_arg', |
| 432 server_override: @server, |
| 433 completion_queue_override: @server_queue, |
| 434 pool_size: 1, |
| 435 poll_period: 1, |
| 436 max_waiting_requests: 0 |
| 437 } |
| 438 alt_srv = RpcServer.new(**opts) |
| 439 alt_srv.handle(SlowService) |
| 440 t = Thread.new { alt_srv.run } |
| 441 alt_srv.wait_till_running |
| 442 req = EchoMsg.new |
| 443 n = 5 # arbitrary, use as many to ensure the server pool is exceeded |
| 444 threads = [] |
| 445 one_failed_as_unavailable = false |
| 446 n.times do |
| 447 threads << Thread.new do |
| 448 stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 449 begin |
| 450 stub.an_rpc(req) |
| 451 rescue GRPC::BadStatus => e |
| 452 one_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE |
| 453 end |
| 454 end |
| 455 end |
| 456 threads.each(&:join) |
| 457 alt_srv.stop |
| 458 t.join |
| 459 expect(one_failed_as_unavailable).to be(true) |
| 460 end |
| 461 end |
| 462 |
| 463 context 'with connect metadata' do |
| 464 let(:test_md_proc) do |
| 465 proc do |mth, md| |
| 466 res = md.clone |
| 467 res['method'] = mth |
| 468 res['connect_k1'] = 'connect_v1' |
| 469 res |
| 470 end |
| 471 end |
| 472 before(:each) do |
| 473 server_opts = { |
| 474 server_override: @server, |
| 475 completion_queue_override: @server_queue, |
| 476 poll_period: 1, |
| 477 connect_md_proc: test_md_proc |
| 478 } |
| 479 @srv = RpcServer.new(**server_opts) |
| 480 end |
| 481 |
| 482 it 'should send connect metadata to the client', server: true do |
| 483 service = EchoService.new |
| 484 @srv.handle(service) |
| 485 t = Thread.new { @srv.run } |
| 486 @srv.wait_till_running |
| 487 req = EchoMsg.new |
| 488 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 489 op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) |
| 490 expect(op.metadata).to be nil |
| 491 expect(op.execute).to be_a(EchoMsg) |
| 492 wanted_md = { |
| 493 'k1' => 'v1', |
| 494 'k2' => 'v2', |
| 495 'method' => '/EchoService/an_rpc', |
| 496 'connect_k1' => 'connect_v1' |
| 497 } |
| 498 wanted_md.each do |key, value| |
| 499 expect(op.metadata[key]).to eq(value) |
| 500 end |
| 501 @srv.stop |
| 502 t.join |
| 503 end |
| 504 end |
| 505 |
| 506 context 'with trailing metadata' do |
| 507 before(:each) do |
| 508 server_opts = { |
| 509 server_override: @server, |
| 510 completion_queue_override: @server_queue, |
| 511 poll_period: 1 |
| 512 } |
| 513 @srv = RpcServer.new(**server_opts) |
| 514 end |
| 515 |
| 516 it 'should be added to BadStatus when requests fail', server: true do |
| 517 service = FailingService.new |
| 518 @srv.handle(service) |
| 519 t = Thread.new { @srv.run } |
| 520 @srv.wait_till_running |
| 521 req = EchoMsg.new |
| 522 stub = FailingStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 523 blk = proc { stub.an_rpc(req) } |
| 524 |
| 525 # confirm it raise the expected error |
| 526 expect(&blk).to raise_error GRPC::BadStatus |
| 527 |
| 528 # call again and confirm exception contained the trailing metadata. |
| 529 begin |
| 530 blk.call |
| 531 rescue GRPC::BadStatus => e |
| 532 expect(e.code).to eq(service.code) |
| 533 expect(e.details).to eq(service.details) |
| 534 expect(e.metadata).to eq(service.md) |
| 535 end |
| 536 @srv.stop |
| 537 t.join |
| 538 end |
| 539 |
| 540 it 'should be received by the client', server: true do |
| 541 wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' } |
| 542 service = EchoService.new(k1: 'out_v1', k2: 'out_v2') |
| 543 @srv.handle(service) |
| 544 t = Thread.new { @srv.run } |
| 545 @srv.wait_till_running |
| 546 req = EchoMsg.new |
| 547 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) |
| 548 op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) |
| 549 expect(op.metadata).to be nil |
| 550 expect(op.execute).to be_a(EchoMsg) |
| 551 expect(op.metadata).to eq(wanted_trailers) |
| 552 @srv.stop |
| 553 t.join |
| 554 end |
| 555 end |
| 556 end |
| 557 end |
OLD | NEW |