Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Unified Diff: third_party/grpc/src/ruby/lib/grpc/generic/active_call.rb

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/ruby/lib/grpc/generic/active_call.rb
diff --git a/third_party/grpc/src/ruby/lib/grpc/generic/active_call.rb b/third_party/grpc/src/ruby/lib/grpc/generic/active_call.rb
new file mode 100644
index 0000000000000000000000000000000000000000..e80d24edc9b487993c368ee693d61b8685d91c64
--- /dev/null
+++ b/third_party/grpc/src/ruby/lib/grpc/generic/active_call.rb
@@ -0,0 +1,488 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'forwardable'
+require 'grpc/generic/bidi_call'
+
+class Struct
+ # BatchResult is the struct returned by calls to call#start_batch.
+ class BatchResult
+ # check_status returns the status, raising an error if the status
+ # is non-nil and not OK.
+ def check_status
+ return nil if status.nil?
+ fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+ if status.code != GRPC::Core::StatusCodes::OK
+ GRPC.logger.debug("Failing with status #{status}")
+ # raise BadStatus, propagating the metadata if present.
+ md = status.metadata
+ with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]
+ fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys)
+ end
+ status
+ end
+ end
+end
+
+# GRPC contains the General RPC module.
+module GRPC
+ # The ActiveCall class provides simple methods for sending marshallable
+ # data to a call
+ class ActiveCall
+ include Core::TimeConsts
+ include Core::CallOps
+ extend Forwardable
+ attr_reader(:deadline)
+ def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=
+
+ # client_invoke begins a client invocation.
+ #
+ # Flow Control note: this blocks until flow control accepts that client
+ # request can go ahead.
+ #
+ # deadline is the absolute deadline for the call.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param call [Call] a call on which to start and invocation
+ # @param q [CompletionQueue] the completion queue
+ def self.client_invoke(call, q, **kw)
+ fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
+ unless q.is_a? Core::CompletionQueue
+ fail(TypeError, '!Core::CompletionQueue')
+ end
+ metadata_tag = Object.new
+ call.run_batch(q, metadata_tag, INFINITE_FUTURE,
+ SEND_INITIAL_METADATA => kw)
+ metadata_tag
+ end
+
+ # Creates an ActiveCall.
+ #
+ # ActiveCall should only be created after a call is accepted. That
+ # means different things on a client and a server. On the client, the
+ # call is accepted after calling call.invoke. On the server, this is
+ # after call.accept.
+ #
+ # #initialize cannot determine if the call is accepted or not; so if a
+ # call that's not accepted is used here, the error won't be visible until
+ # the ActiveCall methods are called.
+ #
+ # deadline is the absolute deadline for the call.
+ #
+ # @param call [Call] the call used by the ActiveCall
+ # @param q [CompletionQueue] the completion queue used to accept
+ # the call
+ # @param marshal [Function] f(obj)->string that marshal requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [Fixnum] the deadline for the call to complete
+ # @param metadata_tag [Object] the object use obtain metadata for clients
+ # @param started [true|false] indicates if the call has begun
+ def initialize(call, q, marshal, unmarshal, deadline, started: true,
+ metadata_tag: nil)
+ fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
+ unless q.is_a? Core::CompletionQueue
+ fail(TypeError, '!Core::CompletionQueue')
+ end
+ @call = call
+ @cq = q
+ @deadline = deadline
+ @marshal = marshal
+ @started = started
+ @unmarshal = unmarshal
+ @metadata_tag = metadata_tag
+ @op_notifier = nil
+ end
+
+ # output_metadata are provides access to hash that can be used to
+ # save metadata to be sent as trailer
+ def output_metadata
+ @output_metadata ||= {}
+ end
+
+ # cancelled indicates if the call was cancelled
+ def cancelled
+ !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
+ end
+
+ # multi_req_view provides a restricted view of this ActiveCall for use
+ # in a server client-streaming handler.
+ def multi_req_view
+ MultiReqView.new(self)
+ end
+
+ # single_req_view provides a restricted view of this ActiveCall for use in
+ # a server request-response handler.
+ def single_req_view
+ SingleReqView.new(self)
+ end
+
+ # operation provides a restricted view of this ActiveCall for use as
+ # a Operation.
+ def operation
+ @op_notifier = Notifier.new
+ Operation.new(self)
+ end
+
+ # writes_done indicates that all writes are completed.
+ #
+ # It blocks until the remote endpoint acknowledges with at status unless
+ # assert_finished is set to false. Any calls to #remote_send after this
+ # call will fail.
+ #
+ # @param assert_finished [true, false] when true(default), waits for
+ # FINISHED.
+ def writes_done(assert_finished = true)
+ ops = {
+ SEND_CLOSE_FROM_CLIENT => nil
+ }
+ ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ return unless assert_finished
+ @call.status = batch_result.status
+ op_is_done
+ batch_result.check_status
+ end
+
+ # finished waits until a client call is completed.
+ #
+ # It blocks until the remote endpoint acknowledges by sending a status.
+ def finished
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
+ RECV_STATUS_ON_CLIENT => nil)
+ unless batch_result.status.nil?
+ if @call.metadata.nil?
+ @call.metadata = batch_result.status.metadata
+ else
+ @call.metadata.merge!(batch_result.status.metadata)
+ end
+ end
+ @call.status = batch_result.status
+ op_is_done
+ batch_result.check_status
+ end
+
+ # remote_send sends a request to the remote endpoint.
+ #
+ # It blocks until the remote endpoint accepts the message.
+ #
+ # @param req [Object, String] the object to send or it's marshal form.
+ # @param marshalled [false, true] indicates if the object is already
+ # marshalled.
+ def remote_send(req, marshalled = false)
+ GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
+ payload = marshalled ? req : @marshal.call(req)
+ @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
+ end
+
+ # send_status sends a status to the remote endpoint.
+ #
+ # @param code [int] the status code to send
+ # @param details [String] details
+ # @param assert_finished [true, false] when true(default), waits for
+ # FINISHED.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ def send_status(code = OK, details = '', assert_finished = false, **kw)
+ ops = {
+ SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw)
+ }
+ ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
+ @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ nil
+ end
+
+ # remote_read reads a response from the remote endpoint.
+ #
+ # It blocks until the remote endpoint replies with a message or status.
+ # On receiving a message, it returns the response after unmarshalling it.
+ # On receiving a status, it returns nil if the status is OK, otherwise
+ # raising BadStatus
+ def remote_read
+ ops = { RECV_MESSAGE => nil }
+ ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ unless @metadata_tag.nil?
+ @call.metadata = batch_result.metadata
+ @metadata_tag = nil
+ end
+ GRPC.logger.debug("received req: #{batch_result}")
+ unless batch_result.nil? || batch_result.message.nil?
+ GRPC.logger.debug("received req.to_s: #{batch_result.message}")
+ res = @unmarshal.call(batch_result.message)
+ GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}")
+ return res
+ end
+ GRPC.logger.debug('found nil; the final response has been sent')
+ nil
+ end
+
+ # each_remote_read passes each response to the given block or returns an
+ # enumerator the responses if no block is given.
+ #
+ # == Enumerator ==
+ #
+ # * #next blocks until the remote endpoint sends a READ or FINISHED
+ # * for each read, enumerator#next yields the response
+ # * on status
+ # * if it's is OK, enumerator#next raises StopException
+ # * if is not OK, enumerator#next raises RuntimeException
+ #
+ # == Block ==
+ #
+ # * if provided it is executed for each response
+ # * the call blocks until no more responses are provided
+ #
+ # @return [Enumerator] if no block was given
+ def each_remote_read
+ return enum_for(:each_remote_read) unless block_given?
+ loop do
+ resp = remote_read
+ break if resp.nil? # the last response was received
+ yield resp
+ end
+ end
+
+ # each_remote_read_then_finish passes each response to the given block or
+ # returns an enumerator of the responses if no block is given.
+ #
+ # It is like each_remote_read, but it blocks on finishing on detecting
+ # the final message.
+ #
+ # == Enumerator ==
+ #
+ # * #next blocks until the remote endpoint sends a READ or FINISHED
+ # * for each read, enumerator#next yields the response
+ # * on status
+ # * if it's is OK, enumerator#next raises StopException
+ # * if is not OK, enumerator#next raises RuntimeException
+ #
+ # == Block ==
+ #
+ # * if provided it is executed for each response
+ # * the call blocks until no more responses are provided
+ #
+ # @return [Enumerator] if no block was given
+ def each_remote_read_then_finish
+ return enum_for(:each_remote_read_then_finish) unless block_given?
+ loop do
+ resp = remote_read
+ break if resp.is_a? Struct::Status # is an OK status
+ if resp.nil? # the last response was received, but not finished yet
+ finished
+ break
+ end
+ yield resp
+ end
+ end
+
+ # request_response sends a request to a GRPC server, and returns the
+ # response.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param req [Object] the request sent to the server
+ # @return [Object] the response received from the server
+ def request_response(req, **kw)
+ start_call(**kw) unless @started
+ remote_send(req)
+ writes_done(false)
+ response = remote_read
+ finished unless response.is_a? Struct::Status
+ response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
+ end
+
+ # client_streamer sends a stream of requests to a GRPC server, and
+ # returns a single response.
+ #
+ # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
+ # #each enumeration protocol. In the simplest case, requests will be an
+ # array of marshallable objects; in typical case it will be an Enumerable
+ # that allows dynamic construction of the marshallable objects.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param requests [Object] an Enumerable of requests to send
+ # @return [Object] the response received from the server
+ def client_streamer(requests, **kw)
+ start_call(**kw) unless @started
+ requests.each { |r| remote_send(r) }
+ writes_done(false)
+ response = remote_read
+ finished unless response.is_a? Struct::Status
+ response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
+ end
+
+ # server_streamer sends one request to the GRPC server, which yields a
+ # stream of responses.
+ #
+ # responses provides an enumerator over the streamed responses, i.e. it
+ # follows Ruby's #each iteration protocol. The enumerator blocks while
+ # waiting for each response, stops when the server signals that no
+ # further responses will be supplied. If the implicit block is provided,
+ # it is executed with each response as the argument and no result is
+ # returned.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ # any keyword arguments are treated as metadata to be sent to the server.
+ #
+ # @param req [Object] the request sent to the server
+ # @return [Enumerator|nil] a response Enumerator
+ def server_streamer(req, **kw)
+ start_call(**kw) unless @started
+ remote_send(req)
+ writes_done(false)
+ replies = enum_for(:each_remote_read_then_finish)
+ return replies unless block_given?
+ replies.each { |r| yield r }
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
+ end
+
+ # bidi_streamer sends a stream of requests to the GRPC server, and yields
+ # a stream of responses.
+ #
+ # This method takes an Enumerable of requests, and returns and enumerable
+ # of responses.
+ #
+ # == requests ==
+ #
+ # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
+ # #each enumeration protocol. In the simplest case, requests will be an
+ # array of marshallable objects; in typical case it will be an
+ # Enumerable that allows dynamic construction of the marshallable
+ # objects.
+ #
+ # == responses ==
+ #
+ # This is an enumerator of responses. I.e, its #next method blocks
+ # waiting for the next response. Also, if at any point the block needs
+ # to consume all the remaining responses, this can be done using #each or
+ # #collect. Calling #each or #collect should only be done if
+ # the_call#writes_done has been called, otherwise the block will loop
+ # forever.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param requests [Object] an Enumerable of requests to send
+ # @return [Enumerator, nil] a response Enumerator
+ def bidi_streamer(requests, **kw, &blk)
+ start_call(**kw) unless @started
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal,
+ metadata_tag: @metadata_tag)
+ @metadata_tag = nil # run_on_client ensures metadata is read
+ bd.run_on_client(requests, @op_notifier, &blk)
+ end
+
+ # run_server_bidi orchestrates a BiDi stream processing on a server.
+ #
+ # N.B. gen_each_reply is a func(Enumerable<Requests>)
+ #
+ # It takes an enumerable of requests as an arg, in case there is a
+ # relationship between the stream of requests and the stream of replies.
+ #
+ # This does not mean that must necessarily be one. E.g, the replies
+ # produced by gen_each_reply could ignore the received_msgs
+ #
+ # @param gen_each_reply [Proc] generates the BiDi stream replies
+ def run_server_bidi(gen_each_reply)
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal)
+ bd.run_on_server(gen_each_reply)
+ end
+
+ # Waits till an operation completes
+ def wait
+ return if @op_notifier.nil?
+ GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
+ @op_notifier.wait
+ end
+
+ # Signals that an operation is done
+ def op_is_done
+ return if @op_notifier.nil?
+ @op_notifier.notify(self)
+ end
+
+ private
+
+ # Starts the call if not already started
+ def start_call(**kw)
+ return if @started
+ @metadata_tag = ActiveCall.client_invoke(@call, @cq, **kw)
+ @started = true
+ end
+
+ def self.view_class(*visible_methods)
+ Class.new do
+ extend ::Forwardable
+ def_delegators :@wrapped, *visible_methods
+
+ # @param wrapped [ActiveCall] the call whose methods are shielded
+ def initialize(wrapped)
+ @wrapped = wrapped
+ end
+ end
+ end
+
+ # SingleReqView limits access to an ActiveCall's methods for use in server
+ # handlers that receive just one request.
+ SingleReqView = view_class(:cancelled, :deadline, :metadata,
+ :output_metadata)
+
+ # MultiReqView limits access to an ActiveCall's methods for use in
+ # server client_streamer handlers.
+ MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
+ :each_remote_read, :metadata, :output_metadata)
+
+ # Operation limits access to an ActiveCall's methods for use as
+ # a Operation on the client.
+ Operation = view_class(:cancel, :cancelled, :deadline, :execute,
+ :metadata, :status, :start_call, :wait, :write_flag,
+ :write_flag=)
+ end
+end
« no previous file with comments | « third_party/grpc/src/ruby/lib/grpc/errors.rb ('k') | third_party/grpc/src/ruby/lib/grpc/generic/bidi_call.rb » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698