| Index: third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb
|
| diff --git a/third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb b/third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..b30d19dd2bbb676ef989b460f500b3eeaad25754
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb
|
| @@ -0,0 +1,523 @@
|
| +# Copyright 2015-2016, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +require 'grpc/grpc'
|
| +require 'grpc/generic/active_call'
|
| +require 'grpc/generic/service'
|
| +require 'thread'
|
| +
|
| +# A global that contains signals the gRPC servers should respond to.
|
| +$grpc_signals = []
|
| +
|
| +# GRPC contains the General RPC module.
|
| +module GRPC
|
| + # Handles the signals in $grpc_signals.
|
| + #
|
| + # @return false if the server should exit, true if not.
|
| + def handle_signals
|
| + loop do
|
| + sig = $grpc_signals.shift
|
| + case sig
|
| + when 'INT'
|
| + return false
|
| + when 'TERM'
|
| + return false
|
| + when nil
|
| + return true
|
| + end
|
| + end
|
| + true
|
| + end
|
| + module_function :handle_signals
|
| +
|
| + # Sets up a signal handler that adds signals to the signal handling global.
|
| + #
|
| + # Signal handlers should do as little as humanly possible.
|
| + # Here, they just add themselves to $grpc_signals
|
| + #
|
| + # RpcServer (and later other parts of gRPC) monitors the signals
|
| + # $grpc_signals in its own non-signal context.
|
| + def trap_signals
|
| + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
|
| + end
|
| + module_function :trap_signals
|
| +
|
| + # Pool is a simple thread pool.
|
| + class Pool
|
| + # Default keep alive period is 1s
|
| + DEFAULT_KEEP_ALIVE = 1
|
| +
|
| + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
|
| + fail 'pool size must be positive' unless size > 0
|
| + @jobs = Queue.new
|
| + @size = size
|
| + @stopped = false
|
| + @stop_mutex = Mutex.new # needs to be held when accessing @stopped
|
| + @stop_cond = ConditionVariable.new
|
| + @workers = []
|
| + @keep_alive = keep_alive
|
| + end
|
| +
|
| + # Returns the number of jobs waiting
|
| + def jobs_waiting
|
| + @jobs.size
|
| + end
|
| +
|
| + # Runs the given block on the queue with the provided args.
|
| + #
|
| + # @param args the args passed blk when it is called
|
| + # @param blk the block to call
|
| + def schedule(*args, &blk)
|
| + return if blk.nil?
|
| + @stop_mutex.synchronize do
|
| + if @stopped
|
| + GRPC.logger.warn('did not schedule job, already stopped')
|
| + return
|
| + end
|
| + GRPC.logger.info('schedule another job')
|
| + @jobs << [blk, args]
|
| + end
|
| + end
|
| +
|
| + # Starts running the jobs in the thread pool.
|
| + def start
|
| + @stop_mutex.synchronize do
|
| + fail 'already stopped' if @stopped
|
| + end
|
| + until @workers.size == @size.to_i
|
| + next_thread = Thread.new do
|
| + catch(:exit) do # allows { throw :exit } to kill a thread
|
| + loop_execute_jobs
|
| + end
|
| + remove_current_thread
|
| + end
|
| + @workers << next_thread
|
| + end
|
| + end
|
| +
|
| + # Stops the jobs in the pool
|
| + def stop
|
| + GRPC.logger.info('stopping, will wait for all the workers to exit')
|
| + @workers.size.times { schedule { throw :exit } }
|
| + @stop_mutex.synchronize do # wait @keep_alive for works to stop
|
| + @stopped = true
|
| + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
|
| + end
|
| + forcibly_stop_workers
|
| + GRPC.logger.info('stopped, all workers are shutdown')
|
| + end
|
| +
|
| + protected
|
| +
|
| + # Forcibly shutdown any threads that are still alive.
|
| + def forcibly_stop_workers
|
| + return unless @workers.size > 0
|
| + GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
|
| + @workers.each do |t|
|
| + next unless t.alive?
|
| + begin
|
| + t.exit
|
| + rescue StandardError => e
|
| + GRPC.logger.warn('error while terminating a worker')
|
| + GRPC.logger.warn(e)
|
| + end
|
| + end
|
| + end
|
| +
|
| + # removes the threads from workers, and signal when all the
|
| + # threads are complete.
|
| + def remove_current_thread
|
| + @stop_mutex.synchronize do
|
| + @workers.delete(Thread.current)
|
| + @stop_cond.signal if @workers.size.zero?
|
| + end
|
| + end
|
| +
|
| + def loop_execute_jobs
|
| + loop do
|
| + begin
|
| + blk, args = @jobs.pop
|
| + blk.call(*args)
|
| + rescue StandardError => e
|
| + GRPC.logger.warn('Error in worker thread')
|
| + GRPC.logger.warn(e)
|
| + end
|
| + end
|
| + end
|
| + end
|
| +
|
| + # RpcServer hosts a number of services and makes them available on the
|
| + # network.
|
| + class RpcServer
|
| + include Core::CallOps
|
| + include Core::TimeConsts
|
| + extend ::Forwardable
|
| +
|
| + def_delegators :@server, :add_http2_port
|
| +
|
| + # Default thread pool size is 3
|
| + DEFAULT_POOL_SIZE = 3
|
| +
|
| + # Default max_waiting_requests size is 20
|
| + DEFAULT_MAX_WAITING_REQUESTS = 20
|
| +
|
| + # Default poll period is 1s
|
| + DEFAULT_POLL_PERIOD = 1
|
| +
|
| + # Signal check period is 0.25s
|
| + SIGNAL_CHECK_PERIOD = 0.25
|
| +
|
| + # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
|
| + # its arguments.
|
| + def self.setup_cq(alt_cq)
|
| + return Core::CompletionQueue.new if alt_cq.nil?
|
| + unless alt_cq.is_a? Core::CompletionQueue
|
| + fail(TypeError, '!CompletionQueue')
|
| + end
|
| + alt_cq
|
| + end
|
| +
|
| + # setup_srv is used by #initialize to constuct a Core::Server from its
|
| + # arguments.
|
| + def self.setup_srv(alt_srv, cq, **kw)
|
| + return Core::Server.new(cq, kw) if alt_srv.nil?
|
| + fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server
|
| + alt_srv
|
| + end
|
| +
|
| + # setup_connect_md_proc is used by #initialize to validate the
|
| + # connect_md_proc.
|
| + def self.setup_connect_md_proc(a_proc)
|
| + return nil if a_proc.nil?
|
| + fail(TypeError, '!Proc') unless a_proc.is_a? Proc
|
| + a_proc
|
| + end
|
| +
|
| + # Creates a new RpcServer.
|
| + #
|
| + # The RPC server is configured using keyword arguments.
|
| + #
|
| + # There are some specific keyword args used to configure the RpcServer
|
| + # instance, however other arbitrary are allowed and when present are used
|
| + # to configure the listeninng connection set up by the RpcServer.
|
| + #
|
| + # * server_override: which if passed must be a [GRPC::Core::Server]. When
|
| + # present.
|
| + #
|
| + # * poll_period: when present, the server polls for new events with this
|
| + # period
|
| + #
|
| + # * pool_size: the size of the thread pool the server uses to run its
|
| + # threads
|
| + #
|
| + # * completion_queue_override: when supplied, this will be used as the
|
| + # completion_queue that the server uses to receive network events,
|
| + # otherwise its creates a new instance itself
|
| + #
|
| + # * creds: [GRPC::Core::ServerCredentials]
|
| + # the credentials used to secure the server
|
| + #
|
| + # * max_waiting_requests: the maximum number of requests that are not
|
| + # being handled to allow. When this limit is exceeded, the server responds
|
| + # with not available to new requests
|
| + #
|
| + # * connect_md_proc:
|
| + # when non-nil is a proc for determining metadata to to send back the client
|
| + # on receiving an invocation req. The proc signature is:
|
| + # {key: val, ..} func(method_name, {key: val, ...})
|
| + def initialize(pool_size:DEFAULT_POOL_SIZE,
|
| + max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
|
| + poll_period:DEFAULT_POLL_PERIOD,
|
| + completion_queue_override:nil,
|
| + server_override:nil,
|
| + connect_md_proc:nil,
|
| + **kw)
|
| + @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
|
| + @cq = RpcServer.setup_cq(completion_queue_override)
|
| + @max_waiting_requests = max_waiting_requests
|
| + @poll_period = poll_period
|
| + @pool_size = pool_size
|
| + @pool = Pool.new(@pool_size)
|
| + @run_cond = ConditionVariable.new
|
| + @run_mutex = Mutex.new
|
| + # running_state can take 4 values: :not_started, :running, :stopping, and
|
| + # :stopped. State transitions can only proceed in that order.
|
| + @running_state = :not_started
|
| + @server = RpcServer.setup_srv(server_override, @cq, **kw)
|
| + end
|
| +
|
| + # stops a running server
|
| + #
|
| + # the call has no impact if the server is already stopped, otherwise
|
| + # server's current call loop is it's last.
|
| + def stop
|
| + @run_mutex.synchronize do
|
| + fail 'Cannot stop before starting' if @running_state == :not_started
|
| + return if @running_state != :running
|
| + transition_running_state(:stopping)
|
| + end
|
| + deadline = from_relative_time(@poll_period)
|
| + @server.close(@cq, deadline)
|
| + @pool.stop
|
| + end
|
| +
|
| + def running_state
|
| + @run_mutex.synchronize do
|
| + return @running_state
|
| + end
|
| + end
|
| +
|
| + # Can only be called while holding @run_mutex
|
| + def transition_running_state(target_state)
|
| + state_transitions = {
|
| + not_started: :running,
|
| + running: :stopping,
|
| + stopping: :stopped
|
| + }
|
| + if state_transitions[@running_state] == target_state
|
| + @running_state = target_state
|
| + else
|
| + fail "Bad server state transition: #{@running_state}->#{target_state}"
|
| + end
|
| + end
|
| +
|
| + def running?
|
| + running_state == :running
|
| + end
|
| +
|
| + def stopped?
|
| + running_state == :stopped
|
| + end
|
| +
|
| + # Is called from other threads to wait for #run to start up the server.
|
| + #
|
| + # If run has not been called, this returns immediately.
|
| + #
|
| + # @param timeout [Numeric] number of seconds to wait
|
| + # @result [true, false] true if the server is running, false otherwise
|
| + def wait_till_running(timeout = nil)
|
| + @run_mutex.synchronize do
|
| + @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
|
| + return @running_state == :running
|
| + end
|
| + end
|
| +
|
| + # Runs the server in its own thread, then waits for signal INT or TERM on
|
| + # the current thread to terminate it.
|
| + def run_till_terminated
|
| + GRPC.trap_signals
|
| + t = Thread.new { run }
|
| + wait_till_running
|
| + loop do
|
| + sleep SIGNAL_CHECK_PERIOD
|
| + break unless GRPC.handle_signals
|
| + end
|
| + stop
|
| + t.join
|
| + end
|
| +
|
| + # handle registration of classes
|
| + #
|
| + # service is either a class that includes GRPC::GenericService and whose
|
| + # #new function can be called without argument or any instance of such a
|
| + # class.
|
| + #
|
| + # E.g, after
|
| + #
|
| + # class Divider
|
| + # include GRPC::GenericService
|
| + # rpc :div DivArgs, DivReply # single request, single response
|
| + # def initialize(optional_arg='default option') # no args
|
| + # ...
|
| + # end
|
| + #
|
| + # srv = GRPC::RpcServer.new(...)
|
| + #
|
| + # # Either of these works
|
| + #
|
| + # srv.handle(Divider)
|
| + #
|
| + # # or
|
| + #
|
| + # srv.handle(Divider.new('replace optional arg'))
|
| + #
|
| + # It raises RuntimeError:
|
| + # - if service is not valid service class or object
|
| + # - its handler methods are already registered
|
| + # - if the server is already running
|
| + #
|
| + # @param service [Object|Class] a service class or object as described
|
| + # above
|
| + def handle(service)
|
| + @run_mutex.synchronize do
|
| + unless @running_state == :not_started
|
| + fail 'cannot add services if the server has been started'
|
| + end
|
| + cls = service.is_a?(Class) ? service : service.class
|
| + assert_valid_service_class(cls)
|
| + add_rpc_descs_for(service)
|
| + end
|
| + end
|
| +
|
| + # runs the server
|
| + #
|
| + # - if no rpc_descs are registered, this exits immediately, otherwise it
|
| + # continues running permanently and does not return until program exit.
|
| + #
|
| + # - #running? returns true after this is called, until #stop cause the
|
| + # the server to stop.
|
| + def run
|
| + @run_mutex.synchronize do
|
| + fail 'cannot run without registering services' if rpc_descs.size.zero?
|
| + @pool.start
|
| + @server.start
|
| + transition_running_state(:running)
|
| + @run_cond.broadcast
|
| + end
|
| + loop_handle_server_calls
|
| + end
|
| +
|
| + # Sends UNAVAILABLE if there are too many unprocessed jobs
|
| + def available?(an_rpc)
|
| + jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
|
| + GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
|
| + return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
|
| + GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
|
| + noop = proc { |x| x }
|
| + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
|
| + c.send_status(StatusCodes::UNAVAILABLE, '')
|
| + nil
|
| + end
|
| +
|
| + # Sends UNIMPLEMENTED if the method is not implemented by this server
|
| + def implemented?(an_rpc)
|
| + mth = an_rpc.method.to_sym
|
| + return an_rpc if rpc_descs.key?(mth)
|
| + GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
|
| + noop = proc { |x| x }
|
| + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
|
| + c.send_status(StatusCodes::UNIMPLEMENTED, '')
|
| + nil
|
| + end
|
| +
|
| + # handles calls to the server
|
| + def loop_handle_server_calls
|
| + fail 'not started' if running_state == :not_started
|
| + loop_tag = Object.new
|
| + while running_state == :running
|
| + begin
|
| + an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
|
| + break if (!an_rpc.nil?) && an_rpc.call.nil?
|
| +
|
| + active_call = new_active_server_call(an_rpc)
|
| + unless active_call.nil?
|
| + @pool.schedule(active_call) do |ac|
|
| + c, mth = ac
|
| + rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
|
| + end
|
| + end
|
| + rescue Core::CallError, RuntimeError => e
|
| + # these might happen for various reasonse. The correct behaviour of
|
| + # the server is to log them and continue, if it's not shutting down.
|
| + if running_state == :running
|
| + GRPC.logger.warn("server call failed: #{e}")
|
| + end
|
| + next
|
| + end
|
| + end
|
| + # @running_state should be :stopping here
|
| + @run_mutex.synchronize { transition_running_state(:stopped) }
|
| + GRPC.logger.info("stopped: #{self}")
|
| + end
|
| +
|
| + def new_active_server_call(an_rpc)
|
| + return nil if an_rpc.nil? || an_rpc.call.nil?
|
| +
|
| + # allow the metadata to be accessed from the call
|
| + handle_call_tag = Object.new
|
| + an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
|
| + GRPC.logger.debug("call md is #{an_rpc.metadata}")
|
| + connect_md = nil
|
| + unless @connect_md_proc.nil?
|
| + connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
|
| + end
|
| + an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
|
| + SEND_INITIAL_METADATA => connect_md)
|
| + return nil unless available?(an_rpc)
|
| + return nil unless implemented?(an_rpc)
|
| +
|
| + # Create the ActiveCall
|
| + GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
|
| + rpc_desc = rpc_descs[an_rpc.method.to_sym]
|
| + c = ActiveCall.new(an_rpc.call, @cq,
|
| + rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
|
| + an_rpc.deadline)
|
| + mth = an_rpc.method.to_sym
|
| + [c, mth]
|
| + end
|
| +
|
| + protected
|
| +
|
| + def rpc_descs
|
| + @rpc_descs ||= {}
|
| + end
|
| +
|
| + def rpc_handlers
|
| + @rpc_handlers ||= {}
|
| + end
|
| +
|
| + def assert_valid_service_class(cls)
|
| + unless cls.include?(GenericService)
|
| + fail "#{cls} must 'include GenericService'"
|
| + end
|
| + if cls.rpc_descs.size.zero?
|
| + fail "#{cls} should specify some rpc descriptions"
|
| + end
|
| + cls.assert_rpc_descs_have_methods
|
| + end
|
| +
|
| + # This should be called while holding @run_mutex
|
| + def add_rpc_descs_for(service)
|
| + cls = service.is_a?(Class) ? service : service.class
|
| + specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
|
| + cls.rpc_descs.each_pair do |name, spec|
|
| + route = "/#{cls.service_name}/#{name}".to_sym
|
| + fail "already registered: rpc #{route} from #{spec}" if specs.key? route
|
| + specs[route] = spec
|
| + rpc_name = GenericService.underscore(name.to_s).to_sym
|
| + if service.is_a?(Class)
|
| + handlers[route] = cls.new.method(rpc_name)
|
| + else
|
| + handlers[route] = service.method(rpc_name)
|
| + end
|
| + GRPC.logger.info("handling #{route} with #{handlers[route]}")
|
| + end
|
| + end
|
| + end
|
| +end
|
|
|