OLD | NEW |
(Empty) | |
| 1 # Copyright 2015, Google Inc. |
| 2 # All rights reserved. |
| 3 # |
| 4 # Redistribution and use in source and binary forms, with or without |
| 5 # modification, are permitted provided that the following conditions are |
| 6 # met: |
| 7 # |
| 8 # * Redistributions of source code must retain the above copyright |
| 9 # notice, this list of conditions and the following disclaimer. |
| 10 # * Redistributions in binary form must reproduce the above |
| 11 # copyright notice, this list of conditions and the following disclaimer |
| 12 # in the documentation and/or other materials provided with the |
| 13 # distribution. |
| 14 # * Neither the name of Google Inc. nor the names of its |
| 15 # contributors may be used to endorse or promote products derived from |
| 16 # this software without specific prior written permission. |
| 17 # |
| 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 29 |
| 30 require 'grpc' |
| 31 |
| 32 def wakey_thread(&blk) |
| 33 n = GRPC::Notifier.new |
| 34 t = Thread.new do |
| 35 blk.call(n) |
| 36 end |
| 37 n.wait |
| 38 t |
| 39 end |
| 40 |
| 41 def load_test_certs |
| 42 test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') |
| 43 files = ['ca.pem', 'server1.key', 'server1.pem'] |
| 44 files.map { |f| File.open(File.join(test_root, f)).read } |
| 45 end |
| 46 |
| 47 include GRPC::Core::StatusCodes |
| 48 include GRPC::Core::TimeConsts |
| 49 include GRPC::Core::CallOps |
| 50 |
| 51 describe 'ClientStub' do |
| 52 let(:noop) { proc { |x| x } } |
| 53 |
| 54 before(:each) do |
| 55 Thread.abort_on_exception = true |
| 56 @server = nil |
| 57 @server_queue = nil |
| 58 @method = 'an_rpc_method' |
| 59 @pass = OK |
| 60 @fail = INTERNAL |
| 61 @cq = GRPC::Core::CompletionQueue.new |
| 62 end |
| 63 |
| 64 after(:each) do |
| 65 @server.close(@server_queue) unless @server_queue.nil? |
| 66 end |
| 67 |
| 68 describe '#new' do |
| 69 let(:fake_host) { 'localhost:0' } |
| 70 it 'can be created from a host and args' do |
| 71 opts = { a_channel_arg: 'an_arg' } |
| 72 blk = proc do |
| 73 GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts) |
| 74 end |
| 75 expect(&blk).not_to raise_error |
| 76 end |
| 77 |
| 78 it 'can be created with a default deadline' do |
| 79 opts = { a_channel_arg: 'an_arg', deadline: 5 } |
| 80 blk = proc do |
| 81 GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts) |
| 82 end |
| 83 expect(&blk).not_to raise_error |
| 84 end |
| 85 |
| 86 it 'can be created with an channel override' do |
| 87 opts = { a_channel_arg: 'an_arg', channel_override: @ch } |
| 88 blk = proc do |
| 89 GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts) |
| 90 end |
| 91 expect(&blk).not_to raise_error |
| 92 end |
| 93 |
| 94 it 'cannot be created with a bad channel override' do |
| 95 blk = proc do |
| 96 opts = { a_channel_arg: 'an_arg', channel_override: Object.new } |
| 97 GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts) |
| 98 end |
| 99 expect(&blk).to raise_error |
| 100 end |
| 101 |
| 102 it 'cannot be created with bad credentials' do |
| 103 blk = proc do |
| 104 opts = { a_channel_arg: 'an_arg' } |
| 105 GRPC::ClientStub.new(fake_host, @cq, Object.new, **opts) |
| 106 end |
| 107 expect(&blk).to raise_error |
| 108 end |
| 109 |
| 110 it 'can be created with test test credentials' do |
| 111 certs = load_test_certs |
| 112 blk = proc do |
| 113 opts = { |
| 114 GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', |
| 115 a_channel_arg: 'an_arg' |
| 116 } |
| 117 creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil) |
| 118 GRPC::ClientStub.new(fake_host, @cq, creds, **opts) |
| 119 end |
| 120 expect(&blk).to_not raise_error |
| 121 end |
| 122 end |
| 123 |
| 124 describe '#request_response' do |
| 125 before(:each) do |
| 126 @sent_msg, @resp = 'a_msg', 'a_reply' |
| 127 end |
| 128 |
| 129 shared_examples 'request response' do |
| 130 it 'should send a request to/receive a reply from a server' do |
| 131 server_port = create_test_server |
| 132 th = run_request_response(@sent_msg, @resp, @pass) |
| 133 stub = GRPC::ClientStub.new("localhost:#{server_port}", @cq, |
| 134 :this_channel_is_insecure) |
| 135 expect(get_response(stub)).to eq(@resp) |
| 136 th.join |
| 137 end |
| 138 |
| 139 it 'should send metadata to the server ok' do |
| 140 server_port = create_test_server |
| 141 host = "localhost:#{server_port}" |
| 142 th = run_request_response(@sent_msg, @resp, @pass, |
| 143 k1: 'v1', k2: 'v2') |
| 144 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 145 expect(get_response(stub)).to eq(@resp) |
| 146 th.join |
| 147 end |
| 148 |
| 149 it 'should send a request when configured using an override channel' do |
| 150 server_port = create_test_server |
| 151 alt_host = "localhost:#{server_port}" |
| 152 th = run_request_response(@sent_msg, @resp, @pass) |
| 153 ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure) |
| 154 stub = GRPC::ClientStub.new('ignored-host', @cq, |
| 155 :this_channel_is_insecure, |
| 156 channel_override: ch) |
| 157 expect(get_response(stub)).to eq(@resp) |
| 158 th.join |
| 159 end |
| 160 |
| 161 it 'should raise an error if the status is not OK' do |
| 162 server_port = create_test_server |
| 163 host = "localhost:#{server_port}" |
| 164 th = run_request_response(@sent_msg, @resp, @fail) |
| 165 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 166 blk = proc { get_response(stub) } |
| 167 expect(&blk).to raise_error(GRPC::BadStatus) |
| 168 th.join |
| 169 end |
| 170 end |
| 171 |
| 172 describe 'without a call operation' do |
| 173 def get_response(stub) |
| 174 stub.request_response(@method, @sent_msg, noop, noop, |
| 175 k1: 'v1', k2: 'v2') |
| 176 end |
| 177 |
| 178 it_behaves_like 'request response' |
| 179 end |
| 180 |
| 181 describe 'via a call operation' do |
| 182 def get_response(stub) |
| 183 op = stub.request_response(@method, @sent_msg, noop, noop, |
| 184 return_op: true, k1: 'v1', k2: 'v2') |
| 185 expect(op).to be_a(GRPC::ActiveCall::Operation) |
| 186 op.execute |
| 187 end |
| 188 |
| 189 it_behaves_like 'request response' |
| 190 end |
| 191 end |
| 192 |
| 193 describe '#client_streamer' do |
| 194 shared_examples 'client streaming' do |
| 195 before(:each) do |
| 196 @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } |
| 197 @resp = 'a_reply' |
| 198 end |
| 199 |
| 200 it 'should send requests to/receive a reply from a server' do |
| 201 server_port = create_test_server |
| 202 host = "localhost:#{server_port}" |
| 203 th = run_client_streamer(@sent_msgs, @resp, @pass) |
| 204 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 205 expect(get_response(stub)).to eq(@resp) |
| 206 th.join |
| 207 end |
| 208 |
| 209 it 'should send metadata to the server ok' do |
| 210 server_port = create_test_server |
| 211 host = "localhost:#{server_port}" |
| 212 th = run_client_streamer(@sent_msgs, @resp, @pass, |
| 213 k1: 'v1', k2: 'v2') |
| 214 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 215 expect(get_response(stub)).to eq(@resp) |
| 216 th.join |
| 217 end |
| 218 |
| 219 it 'should raise an error if the status is not ok' do |
| 220 server_port = create_test_server |
| 221 host = "localhost:#{server_port}" |
| 222 th = run_client_streamer(@sent_msgs, @resp, @fail) |
| 223 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 224 blk = proc { get_response(stub) } |
| 225 expect(&blk).to raise_error(GRPC::BadStatus) |
| 226 th.join |
| 227 end |
| 228 end |
| 229 |
| 230 describe 'without a call operation' do |
| 231 def get_response(stub) |
| 232 stub.client_streamer(@method, @sent_msgs, noop, noop, |
| 233 k1: 'v1', k2: 'v2') |
| 234 end |
| 235 |
| 236 it_behaves_like 'client streaming' |
| 237 end |
| 238 |
| 239 describe 'via a call operation' do |
| 240 def get_response(stub) |
| 241 op = stub.client_streamer(@method, @sent_msgs, noop, noop, |
| 242 return_op: true, k1: 'v1', k2: 'v2') |
| 243 expect(op).to be_a(GRPC::ActiveCall::Operation) |
| 244 op.execute |
| 245 end |
| 246 |
| 247 it_behaves_like 'client streaming' |
| 248 end |
| 249 end |
| 250 |
| 251 describe '#server_streamer' do |
| 252 shared_examples 'server streaming' do |
| 253 before(:each) do |
| 254 @sent_msg = 'a_msg' |
| 255 @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } |
| 256 end |
| 257 |
| 258 it 'should send a request to/receive replies from a server' do |
| 259 server_port = create_test_server |
| 260 host = "localhost:#{server_port}" |
| 261 th = run_server_streamer(@sent_msg, @replys, @pass) |
| 262 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 263 expect(get_responses(stub).collect { |r| r }).to eq(@replys) |
| 264 th.join |
| 265 end |
| 266 |
| 267 it 'should raise an error if the status is not ok' do |
| 268 server_port = create_test_server |
| 269 host = "localhost:#{server_port}" |
| 270 th = run_server_streamer(@sent_msg, @replys, @fail) |
| 271 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 272 e = get_responses(stub) |
| 273 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) |
| 274 th.join |
| 275 end |
| 276 |
| 277 it 'should send metadata to the server ok' do |
| 278 server_port = create_test_server |
| 279 host = "localhost:#{server_port}" |
| 280 th = run_server_streamer(@sent_msg, @replys, @fail, |
| 281 k1: 'v1', k2: 'v2') |
| 282 stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure) |
| 283 e = get_responses(stub) |
| 284 expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) |
| 285 th.join |
| 286 end |
| 287 end |
| 288 |
| 289 describe 'without a call operation' do |
| 290 def get_responses(stub) |
| 291 e = stub.server_streamer(@method, @sent_msg, noop, noop, |
| 292 k1: 'v1', k2: 'v2') |
| 293 expect(e).to be_a(Enumerator) |
| 294 e |
| 295 end |
| 296 |
| 297 it_behaves_like 'server streaming' |
| 298 end |
| 299 |
| 300 describe 'via a call operation' do |
| 301 def get_responses(stub) |
| 302 op = stub.server_streamer(@method, @sent_msg, noop, noop, |
| 303 return_op: true, k1: 'v1', k2: 'v2') |
| 304 expect(op).to be_a(GRPC::ActiveCall::Operation) |
| 305 e = op.execute |
| 306 expect(e).to be_a(Enumerator) |
| 307 e |
| 308 end |
| 309 |
| 310 it_behaves_like 'server streaming' |
| 311 end |
| 312 end |
| 313 |
| 314 describe '#bidi_streamer' do |
| 315 shared_examples 'bidi streaming' do |
| 316 before(:each) do |
| 317 @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } |
| 318 @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } |
| 319 server_port = create_test_server |
| 320 @host = "localhost:#{server_port}" |
| 321 end |
| 322 |
| 323 it 'supports sending all the requests first', bidi: true do |
| 324 th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, |
| 325 @pass) |
| 326 stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure) |
| 327 e = get_responses(stub) |
| 328 expect(e.collect { |r| r }).to eq(@replys) |
| 329 th.join |
| 330 end |
| 331 |
| 332 it 'supports client-initiated ping pong', bidi: true do |
| 333 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) |
| 334 stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure) |
| 335 e = get_responses(stub) |
| 336 expect(e.collect { |r| r }).to eq(@sent_msgs) |
| 337 th.join |
| 338 end |
| 339 |
| 340 it 'supports a server-initiated ping pong', bidi: true do |
| 341 th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) |
| 342 stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure) |
| 343 e = get_responses(stub) |
| 344 expect(e.collect { |r| r }).to eq(@sent_msgs) |
| 345 th.join |
| 346 end |
| 347 end |
| 348 |
| 349 describe 'without a call operation' do |
| 350 def get_responses(stub) |
| 351 e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) |
| 352 expect(e).to be_a(Enumerator) |
| 353 e |
| 354 end |
| 355 |
| 356 it_behaves_like 'bidi streaming' |
| 357 end |
| 358 |
| 359 describe 'via a call operation' do |
| 360 def get_responses(stub) |
| 361 op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, |
| 362 return_op: true) |
| 363 expect(op).to be_a(GRPC::ActiveCall::Operation) |
| 364 e = op.execute |
| 365 expect(e).to be_a(Enumerator) |
| 366 e |
| 367 end |
| 368 |
| 369 it_behaves_like 'bidi streaming' |
| 370 end |
| 371 |
| 372 describe 'without enough time to run' do |
| 373 before(:each) do |
| 374 @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } |
| 375 @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } |
| 376 server_port = create_test_server |
| 377 @host = "localhost:#{server_port}" |
| 378 end |
| 379 |
| 380 it 'should fail with DeadlineExceeded', bidi: true do |
| 381 @server.start |
| 382 stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure) |
| 383 blk = proc do |
| 384 e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, |
| 385 timeout: 0.001) |
| 386 e.collect { |r| r } |
| 387 end |
| 388 expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/ |
| 389 end |
| 390 end |
| 391 end |
| 392 |
| 393 def run_server_streamer(expected_input, replys, status, **kw) |
| 394 wanted_metadata = kw.clone |
| 395 wakey_thread do |notifier| |
| 396 c = expect_server_to_be_invoked(notifier) |
| 397 wanted_metadata.each do |k, v| |
| 398 expect(c.metadata[k.to_s]).to eq(v) |
| 399 end |
| 400 expect(c.remote_read).to eq(expected_input) |
| 401 replys.each { |r| c.remote_send(r) } |
| 402 c.send_status(status, status == @pass ? 'OK' : 'NOK', true) |
| 403 end |
| 404 end |
| 405 |
| 406 def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, |
| 407 status) |
| 408 wakey_thread do |notifier| |
| 409 c = expect_server_to_be_invoked(notifier) |
| 410 expected_inputs.each { |i| expect(c.remote_read).to eq(i) } |
| 411 replys.each { |r| c.remote_send(r) } |
| 412 c.send_status(status, status == @pass ? 'OK' : 'NOK', true) |
| 413 end |
| 414 end |
| 415 |
| 416 def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) |
| 417 wakey_thread do |notifier| |
| 418 c = expect_server_to_be_invoked(notifier) |
| 419 expected_inputs.each do |i| |
| 420 if client_starts |
| 421 expect(c.remote_read).to eq(i) |
| 422 c.remote_send(i) |
| 423 else |
| 424 c.remote_send(i) |
| 425 expect(c.remote_read).to eq(i) |
| 426 end |
| 427 end |
| 428 c.send_status(status, status == @pass ? 'OK' : 'NOK', true) |
| 429 end |
| 430 end |
| 431 |
| 432 def run_client_streamer(expected_inputs, resp, status, **kw) |
| 433 wanted_metadata = kw.clone |
| 434 wakey_thread do |notifier| |
| 435 c = expect_server_to_be_invoked(notifier) |
| 436 expected_inputs.each { |i| expect(c.remote_read).to eq(i) } |
| 437 wanted_metadata.each do |k, v| |
| 438 expect(c.metadata[k.to_s]).to eq(v) |
| 439 end |
| 440 c.remote_send(resp) |
| 441 c.send_status(status, status == @pass ? 'OK' : 'NOK', true) |
| 442 end |
| 443 end |
| 444 |
| 445 def run_request_response(expected_input, resp, status, **kw) |
| 446 wanted_metadata = kw.clone |
| 447 wakey_thread do |notifier| |
| 448 c = expect_server_to_be_invoked(notifier) |
| 449 expect(c.remote_read).to eq(expected_input) |
| 450 wanted_metadata.each do |k, v| |
| 451 expect(c.metadata[k.to_s]).to eq(v) |
| 452 end |
| 453 c.remote_send(resp) |
| 454 c.send_status(status, status == @pass ? 'OK' : 'NOK', true) |
| 455 end |
| 456 end |
| 457 |
| 458 def create_test_server |
| 459 @server_queue = GRPC::Core::CompletionQueue.new |
| 460 @server = GRPC::Core::Server.new(@server_queue, nil) |
| 461 @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) |
| 462 end |
| 463 |
| 464 def expect_server_to_be_invoked(notifier) |
| 465 @server.start |
| 466 notifier.notify(nil) |
| 467 server_tag = Object.new |
| 468 recvd_rpc = @server.request_call(@server_queue, server_tag, |
| 469 INFINITE_FUTURE) |
| 470 recvd_call = recvd_rpc.call |
| 471 recvd_call.metadata = recvd_rpc.metadata |
| 472 recvd_call.run_batch(@server_queue, server_tag, Time.now + 2, |
| 473 SEND_INITIAL_METADATA => nil) |
| 474 GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE) |
| 475 end |
| 476 end |
OLD | NEW |