OLD | NEW |
(Empty) | |
| 1 # Copyright 2015, Google Inc. |
| 2 # All rights reserved. |
| 3 # |
| 4 # Redistribution and use in source and binary forms, with or without |
| 5 # modification, are permitted provided that the following conditions are |
| 6 # met: |
| 7 # |
| 8 # * Redistributions of source code must retain the above copyright |
| 9 # notice, this list of conditions and the following disclaimer. |
| 10 # * Redistributions in binary form must reproduce the above |
| 11 # copyright notice, this list of conditions and the following disclaimer |
| 12 # in the documentation and/or other materials provided with the |
| 13 # distribution. |
| 14 # * Neither the name of Google Inc. nor the names of its |
| 15 # contributors may be used to endorse or promote products derived from |
| 16 # this software without specific prior written permission. |
| 17 # |
| 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 29 |
| 30 require 'forwardable' |
| 31 require 'grpc/generic/bidi_call' |
| 32 |
| 33 class Struct |
| 34 # BatchResult is the struct returned by calls to call#start_batch. |
| 35 class BatchResult |
| 36 # check_status returns the status, raising an error if the status |
| 37 # is non-nil and not OK. |
| 38 def check_status |
| 39 return nil if status.nil? |
| 40 fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED |
| 41 if status.code != GRPC::Core::StatusCodes::OK |
| 42 GRPC.logger.debug("Failing with status #{status}") |
| 43 # raise BadStatus, propagating the metadata if present. |
| 44 md = status.metadata |
| 45 with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] |
| 46 fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys) |
| 47 end |
| 48 status |
| 49 end |
| 50 end |
| 51 end |
| 52 |
| 53 # GRPC contains the General RPC module. |
| 54 module GRPC |
| 55 # The ActiveCall class provides simple methods for sending marshallable |
| 56 # data to a call |
| 57 class ActiveCall |
| 58 include Core::TimeConsts |
| 59 include Core::CallOps |
| 60 extend Forwardable |
| 61 attr_reader(:deadline) |
| 62 def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag= |
| 63 |
| 64 # client_invoke begins a client invocation. |
| 65 # |
| 66 # Flow Control note: this blocks until flow control accepts that client |
| 67 # request can go ahead. |
| 68 # |
| 69 # deadline is the absolute deadline for the call. |
| 70 # |
| 71 # == Keyword Arguments == |
| 72 # any keyword arguments are treated as metadata to be sent to the server |
| 73 # if a keyword value is a list, multiple metadata for it's key are sent |
| 74 # |
| 75 # @param call [Call] a call on which to start and invocation |
| 76 # @param q [CompletionQueue] the completion queue |
| 77 def self.client_invoke(call, q, **kw) |
| 78 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call |
| 79 unless q.is_a? Core::CompletionQueue |
| 80 fail(TypeError, '!Core::CompletionQueue') |
| 81 end |
| 82 metadata_tag = Object.new |
| 83 call.run_batch(q, metadata_tag, INFINITE_FUTURE, |
| 84 SEND_INITIAL_METADATA => kw) |
| 85 metadata_tag |
| 86 end |
| 87 |
| 88 # Creates an ActiveCall. |
| 89 # |
| 90 # ActiveCall should only be created after a call is accepted. That |
| 91 # means different things on a client and a server. On the client, the |
| 92 # call is accepted after calling call.invoke. On the server, this is |
| 93 # after call.accept. |
| 94 # |
| 95 # #initialize cannot determine if the call is accepted or not; so if a |
| 96 # call that's not accepted is used here, the error won't be visible until |
| 97 # the ActiveCall methods are called. |
| 98 # |
| 99 # deadline is the absolute deadline for the call. |
| 100 # |
| 101 # @param call [Call] the call used by the ActiveCall |
| 102 # @param q [CompletionQueue] the completion queue used to accept |
| 103 # the call |
| 104 # @param marshal [Function] f(obj)->string that marshal requests |
| 105 # @param unmarshal [Function] f(string)->obj that unmarshals responses |
| 106 # @param deadline [Fixnum] the deadline for the call to complete |
| 107 # @param metadata_tag [Object] the object use obtain metadata for clients |
| 108 # @param started [true|false] indicates if the call has begun |
| 109 def initialize(call, q, marshal, unmarshal, deadline, started: true, |
| 110 metadata_tag: nil) |
| 111 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call |
| 112 unless q.is_a? Core::CompletionQueue |
| 113 fail(TypeError, '!Core::CompletionQueue') |
| 114 end |
| 115 @call = call |
| 116 @cq = q |
| 117 @deadline = deadline |
| 118 @marshal = marshal |
| 119 @started = started |
| 120 @unmarshal = unmarshal |
| 121 @metadata_tag = metadata_tag |
| 122 @op_notifier = nil |
| 123 end |
| 124 |
| 125 # output_metadata are provides access to hash that can be used to |
| 126 # save metadata to be sent as trailer |
| 127 def output_metadata |
| 128 @output_metadata ||= {} |
| 129 end |
| 130 |
| 131 # cancelled indicates if the call was cancelled |
| 132 def cancelled |
| 133 !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED |
| 134 end |
| 135 |
| 136 # multi_req_view provides a restricted view of this ActiveCall for use |
| 137 # in a server client-streaming handler. |
| 138 def multi_req_view |
| 139 MultiReqView.new(self) |
| 140 end |
| 141 |
| 142 # single_req_view provides a restricted view of this ActiveCall for use in |
| 143 # a server request-response handler. |
| 144 def single_req_view |
| 145 SingleReqView.new(self) |
| 146 end |
| 147 |
| 148 # operation provides a restricted view of this ActiveCall for use as |
| 149 # a Operation. |
| 150 def operation |
| 151 @op_notifier = Notifier.new |
| 152 Operation.new(self) |
| 153 end |
| 154 |
| 155 # writes_done indicates that all writes are completed. |
| 156 # |
| 157 # It blocks until the remote endpoint acknowledges with at status unless |
| 158 # assert_finished is set to false. Any calls to #remote_send after this |
| 159 # call will fail. |
| 160 # |
| 161 # @param assert_finished [true, false] when true(default), waits for |
| 162 # FINISHED. |
| 163 def writes_done(assert_finished = true) |
| 164 ops = { |
| 165 SEND_CLOSE_FROM_CLIENT => nil |
| 166 } |
| 167 ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished |
| 168 batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) |
| 169 return unless assert_finished |
| 170 @call.status = batch_result.status |
| 171 op_is_done |
| 172 batch_result.check_status |
| 173 end |
| 174 |
| 175 # finished waits until a client call is completed. |
| 176 # |
| 177 # It blocks until the remote endpoint acknowledges by sending a status. |
| 178 def finished |
| 179 batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, |
| 180 RECV_STATUS_ON_CLIENT => nil) |
| 181 unless batch_result.status.nil? |
| 182 if @call.metadata.nil? |
| 183 @call.metadata = batch_result.status.metadata |
| 184 else |
| 185 @call.metadata.merge!(batch_result.status.metadata) |
| 186 end |
| 187 end |
| 188 @call.status = batch_result.status |
| 189 op_is_done |
| 190 batch_result.check_status |
| 191 end |
| 192 |
| 193 # remote_send sends a request to the remote endpoint. |
| 194 # |
| 195 # It blocks until the remote endpoint accepts the message. |
| 196 # |
| 197 # @param req [Object, String] the object to send or it's marshal form. |
| 198 # @param marshalled [false, true] indicates if the object is already |
| 199 # marshalled. |
| 200 def remote_send(req, marshalled = false) |
| 201 GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") |
| 202 payload = marshalled ? req : @marshal.call(req) |
| 203 @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload) |
| 204 end |
| 205 |
| 206 # send_status sends a status to the remote endpoint. |
| 207 # |
| 208 # @param code [int] the status code to send |
| 209 # @param details [String] details |
| 210 # @param assert_finished [true, false] when true(default), waits for |
| 211 # FINISHED. |
| 212 # |
| 213 # == Keyword Arguments == |
| 214 # any keyword arguments are treated as metadata to be sent to the server |
| 215 # if a keyword value is a list, multiple metadata for it's key are sent |
| 216 def send_status(code = OK, details = '', assert_finished = false, **kw) |
| 217 ops = { |
| 218 SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw) |
| 219 } |
| 220 ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished |
| 221 @call.run_batch(@cq, self, INFINITE_FUTURE, ops) |
| 222 nil |
| 223 end |
| 224 |
| 225 # remote_read reads a response from the remote endpoint. |
| 226 # |
| 227 # It blocks until the remote endpoint replies with a message or status. |
| 228 # On receiving a message, it returns the response after unmarshalling it. |
| 229 # On receiving a status, it returns nil if the status is OK, otherwise |
| 230 # raising BadStatus |
| 231 def remote_read |
| 232 ops = { RECV_MESSAGE => nil } |
| 233 ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil? |
| 234 batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) |
| 235 unless @metadata_tag.nil? |
| 236 @call.metadata = batch_result.metadata |
| 237 @metadata_tag = nil |
| 238 end |
| 239 GRPC.logger.debug("received req: #{batch_result}") |
| 240 unless batch_result.nil? || batch_result.message.nil? |
| 241 GRPC.logger.debug("received req.to_s: #{batch_result.message}") |
| 242 res = @unmarshal.call(batch_result.message) |
| 243 GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}") |
| 244 return res |
| 245 end |
| 246 GRPC.logger.debug('found nil; the final response has been sent') |
| 247 nil |
| 248 end |
| 249 |
| 250 # each_remote_read passes each response to the given block or returns an |
| 251 # enumerator the responses if no block is given. |
| 252 # |
| 253 # == Enumerator == |
| 254 # |
| 255 # * #next blocks until the remote endpoint sends a READ or FINISHED |
| 256 # * for each read, enumerator#next yields the response |
| 257 # * on status |
| 258 # * if it's is OK, enumerator#next raises StopException |
| 259 # * if is not OK, enumerator#next raises RuntimeException |
| 260 # |
| 261 # == Block == |
| 262 # |
| 263 # * if provided it is executed for each response |
| 264 # * the call blocks until no more responses are provided |
| 265 # |
| 266 # @return [Enumerator] if no block was given |
| 267 def each_remote_read |
| 268 return enum_for(:each_remote_read) unless block_given? |
| 269 loop do |
| 270 resp = remote_read |
| 271 break if resp.nil? # the last response was received |
| 272 yield resp |
| 273 end |
| 274 end |
| 275 |
| 276 # each_remote_read_then_finish passes each response to the given block or |
| 277 # returns an enumerator of the responses if no block is given. |
| 278 # |
| 279 # It is like each_remote_read, but it blocks on finishing on detecting |
| 280 # the final message. |
| 281 # |
| 282 # == Enumerator == |
| 283 # |
| 284 # * #next blocks until the remote endpoint sends a READ or FINISHED |
| 285 # * for each read, enumerator#next yields the response |
| 286 # * on status |
| 287 # * if it's is OK, enumerator#next raises StopException |
| 288 # * if is not OK, enumerator#next raises RuntimeException |
| 289 # |
| 290 # == Block == |
| 291 # |
| 292 # * if provided it is executed for each response |
| 293 # * the call blocks until no more responses are provided |
| 294 # |
| 295 # @return [Enumerator] if no block was given |
| 296 def each_remote_read_then_finish |
| 297 return enum_for(:each_remote_read_then_finish) unless block_given? |
| 298 loop do |
| 299 resp = remote_read |
| 300 break if resp.is_a? Struct::Status # is an OK status |
| 301 if resp.nil? # the last response was received, but not finished yet |
| 302 finished |
| 303 break |
| 304 end |
| 305 yield resp |
| 306 end |
| 307 end |
| 308 |
| 309 # request_response sends a request to a GRPC server, and returns the |
| 310 # response. |
| 311 # |
| 312 # == Keyword Arguments == |
| 313 # any keyword arguments are treated as metadata to be sent to the server |
| 314 # if a keyword value is a list, multiple metadata for it's key are sent |
| 315 # |
| 316 # @param req [Object] the request sent to the server |
| 317 # @return [Object] the response received from the server |
| 318 def request_response(req, **kw) |
| 319 start_call(**kw) unless @started |
| 320 remote_send(req) |
| 321 writes_done(false) |
| 322 response = remote_read |
| 323 finished unless response.is_a? Struct::Status |
| 324 response |
| 325 rescue GRPC::Core::CallError => e |
| 326 finished # checks for Cancelled |
| 327 raise e |
| 328 end |
| 329 |
| 330 # client_streamer sends a stream of requests to a GRPC server, and |
| 331 # returns a single response. |
| 332 # |
| 333 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's |
| 334 # #each enumeration protocol. In the simplest case, requests will be an |
| 335 # array of marshallable objects; in typical case it will be an Enumerable |
| 336 # that allows dynamic construction of the marshallable objects. |
| 337 # |
| 338 # == Keyword Arguments == |
| 339 # any keyword arguments are treated as metadata to be sent to the server |
| 340 # if a keyword value is a list, multiple metadata for it's key are sent |
| 341 # |
| 342 # @param requests [Object] an Enumerable of requests to send |
| 343 # @return [Object] the response received from the server |
| 344 def client_streamer(requests, **kw) |
| 345 start_call(**kw) unless @started |
| 346 requests.each { |r| remote_send(r) } |
| 347 writes_done(false) |
| 348 response = remote_read |
| 349 finished unless response.is_a? Struct::Status |
| 350 response |
| 351 rescue GRPC::Core::CallError => e |
| 352 finished # checks for Cancelled |
| 353 raise e |
| 354 end |
| 355 |
| 356 # server_streamer sends one request to the GRPC server, which yields a |
| 357 # stream of responses. |
| 358 # |
| 359 # responses provides an enumerator over the streamed responses, i.e. it |
| 360 # follows Ruby's #each iteration protocol. The enumerator blocks while |
| 361 # waiting for each response, stops when the server signals that no |
| 362 # further responses will be supplied. If the implicit block is provided, |
| 363 # it is executed with each response as the argument and no result is |
| 364 # returned. |
| 365 # |
| 366 # == Keyword Arguments == |
| 367 # any keyword arguments are treated as metadata to be sent to the server |
| 368 # if a keyword value is a list, multiple metadata for it's key are sent |
| 369 # any keyword arguments are treated as metadata to be sent to the server. |
| 370 # |
| 371 # @param req [Object] the request sent to the server |
| 372 # @return [Enumerator|nil] a response Enumerator |
| 373 def server_streamer(req, **kw) |
| 374 start_call(**kw) unless @started |
| 375 remote_send(req) |
| 376 writes_done(false) |
| 377 replies = enum_for(:each_remote_read_then_finish) |
| 378 return replies unless block_given? |
| 379 replies.each { |r| yield r } |
| 380 rescue GRPC::Core::CallError => e |
| 381 finished # checks for Cancelled |
| 382 raise e |
| 383 end |
| 384 |
| 385 # bidi_streamer sends a stream of requests to the GRPC server, and yields |
| 386 # a stream of responses. |
| 387 # |
| 388 # This method takes an Enumerable of requests, and returns and enumerable |
| 389 # of responses. |
| 390 # |
| 391 # == requests == |
| 392 # |
| 393 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's |
| 394 # #each enumeration protocol. In the simplest case, requests will be an |
| 395 # array of marshallable objects; in typical case it will be an |
| 396 # Enumerable that allows dynamic construction of the marshallable |
| 397 # objects. |
| 398 # |
| 399 # == responses == |
| 400 # |
| 401 # This is an enumerator of responses. I.e, its #next method blocks |
| 402 # waiting for the next response. Also, if at any point the block needs |
| 403 # to consume all the remaining responses, this can be done using #each or |
| 404 # #collect. Calling #each or #collect should only be done if |
| 405 # the_call#writes_done has been called, otherwise the block will loop |
| 406 # forever. |
| 407 # |
| 408 # == Keyword Arguments == |
| 409 # any keyword arguments are treated as metadata to be sent to the server |
| 410 # if a keyword value is a list, multiple metadata for it's key are sent |
| 411 # |
| 412 # @param requests [Object] an Enumerable of requests to send |
| 413 # @return [Enumerator, nil] a response Enumerator |
| 414 def bidi_streamer(requests, **kw, &blk) |
| 415 start_call(**kw) unless @started |
| 416 bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, |
| 417 metadata_tag: @metadata_tag) |
| 418 @metadata_tag = nil # run_on_client ensures metadata is read |
| 419 bd.run_on_client(requests, @op_notifier, &blk) |
| 420 end |
| 421 |
| 422 # run_server_bidi orchestrates a BiDi stream processing on a server. |
| 423 # |
| 424 # N.B. gen_each_reply is a func(Enumerable<Requests>) |
| 425 # |
| 426 # It takes an enumerable of requests as an arg, in case there is a |
| 427 # relationship between the stream of requests and the stream of replies. |
| 428 # |
| 429 # This does not mean that must necessarily be one. E.g, the replies |
| 430 # produced by gen_each_reply could ignore the received_msgs |
| 431 # |
| 432 # @param gen_each_reply [Proc] generates the BiDi stream replies |
| 433 def run_server_bidi(gen_each_reply) |
| 434 bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) |
| 435 bd.run_on_server(gen_each_reply) |
| 436 end |
| 437 |
| 438 # Waits till an operation completes |
| 439 def wait |
| 440 return if @op_notifier.nil? |
| 441 GRPC.logger.debug("active_call.wait: on #{@op_notifier}") |
| 442 @op_notifier.wait |
| 443 end |
| 444 |
| 445 # Signals that an operation is done |
| 446 def op_is_done |
| 447 return if @op_notifier.nil? |
| 448 @op_notifier.notify(self) |
| 449 end |
| 450 |
| 451 private |
| 452 |
| 453 # Starts the call if not already started |
| 454 def start_call(**kw) |
| 455 return if @started |
| 456 @metadata_tag = ActiveCall.client_invoke(@call, @cq, **kw) |
| 457 @started = true |
| 458 end |
| 459 |
| 460 def self.view_class(*visible_methods) |
| 461 Class.new do |
| 462 extend ::Forwardable |
| 463 def_delegators :@wrapped, *visible_methods |
| 464 |
| 465 # @param wrapped [ActiveCall] the call whose methods are shielded |
| 466 def initialize(wrapped) |
| 467 @wrapped = wrapped |
| 468 end |
| 469 end |
| 470 end |
| 471 |
| 472 # SingleReqView limits access to an ActiveCall's methods for use in server |
| 473 # handlers that receive just one request. |
| 474 SingleReqView = view_class(:cancelled, :deadline, :metadata, |
| 475 :output_metadata) |
| 476 |
| 477 # MultiReqView limits access to an ActiveCall's methods for use in |
| 478 # server client_streamer handlers. |
| 479 MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, |
| 480 :each_remote_read, :metadata, :output_metadata) |
| 481 |
| 482 # Operation limits access to an ActiveCall's methods for use as |
| 483 # a Operation on the client. |
| 484 Operation = view_class(:cancel, :cancelled, :deadline, :execute, |
| 485 :metadata, :status, :start_call, :wait, :write_flag, |
| 486 :write_flag=) |
| 487 end |
| 488 end |
OLD | NEW |