Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb
diff --git a/third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb b/third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb
new file mode 100644
index 0000000000000000000000000000000000000000..b30d19dd2bbb676ef989b460f500b3eeaad25754
--- /dev/null
+++ b/third_party/grpc/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -0,0 +1,523 @@
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'grpc/grpc'
+require 'grpc/generic/active_call'
+require 'grpc/generic/service'
+require 'thread'
+
+# A global that contains signals the gRPC servers should respond to.
+$grpc_signals = []
+
+# GRPC contains the General RPC module.
+module GRPC
+ # Handles the signals in $grpc_signals.
+ #
+ # @return false if the server should exit, true if not.
+ def handle_signals
+ loop do
+ sig = $grpc_signals.shift
+ case sig
+ when 'INT'
+ return false
+ when 'TERM'
+ return false
+ when nil
+ return true
+ end
+ end
+ true
+ end
+ module_function :handle_signals
+
+ # Sets up a signal handler that adds signals to the signal handling global.
+ #
+ # Signal handlers should do as little as humanly possible.
+ # Here, they just add themselves to $grpc_signals
+ #
+ # RpcServer (and later other parts of gRPC) monitors the signals
+ # $grpc_signals in its own non-signal context.
+ def trap_signals
+ %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
+ end
+ module_function :trap_signals
+
+ # Pool is a simple thread pool.
+ class Pool
+ # Default keep alive period is 1s
+ DEFAULT_KEEP_ALIVE = 1
+
+ def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
+ fail 'pool size must be positive' unless size > 0
+ @jobs = Queue.new
+ @size = size
+ @stopped = false
+ @stop_mutex = Mutex.new # needs to be held when accessing @stopped
+ @stop_cond = ConditionVariable.new
+ @workers = []
+ @keep_alive = keep_alive
+ end
+
+ # Returns the number of jobs waiting
+ def jobs_waiting
+ @jobs.size
+ end
+
+ # Runs the given block on the queue with the provided args.
+ #
+ # @param args the args passed blk when it is called
+ # @param blk the block to call
+ def schedule(*args, &blk)
+ return if blk.nil?
+ @stop_mutex.synchronize do
+ if @stopped
+ GRPC.logger.warn('did not schedule job, already stopped')
+ return
+ end
+ GRPC.logger.info('schedule another job')
+ @jobs << [blk, args]
+ end
+ end
+
+ # Starts running the jobs in the thread pool.
+ def start
+ @stop_mutex.synchronize do
+ fail 'already stopped' if @stopped
+ end
+ until @workers.size == @size.to_i
+ next_thread = Thread.new do
+ catch(:exit) do # allows { throw :exit } to kill a thread
+ loop_execute_jobs
+ end
+ remove_current_thread
+ end
+ @workers << next_thread
+ end
+ end
+
+ # Stops the jobs in the pool
+ def stop
+ GRPC.logger.info('stopping, will wait for all the workers to exit')
+ @workers.size.times { schedule { throw :exit } }
+ @stop_mutex.synchronize do # wait @keep_alive for works to stop
+ @stopped = true
+ @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
+ end
+ forcibly_stop_workers
+ GRPC.logger.info('stopped, all workers are shutdown')
+ end
+
+ protected
+
+ # Forcibly shutdown any threads that are still alive.
+ def forcibly_stop_workers
+ return unless @workers.size > 0
+ GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
+ @workers.each do |t|
+ next unless t.alive?
+ begin
+ t.exit
+ rescue StandardError => e
+ GRPC.logger.warn('error while terminating a worker')
+ GRPC.logger.warn(e)
+ end
+ end
+ end
+
+ # removes the threads from workers, and signal when all the
+ # threads are complete.
+ def remove_current_thread
+ @stop_mutex.synchronize do
+ @workers.delete(Thread.current)
+ @stop_cond.signal if @workers.size.zero?
+ end
+ end
+
+ def loop_execute_jobs
+ loop do
+ begin
+ blk, args = @jobs.pop
+ blk.call(*args)
+ rescue StandardError => e
+ GRPC.logger.warn('Error in worker thread')
+ GRPC.logger.warn(e)
+ end
+ end
+ end
+ end
+
+ # RpcServer hosts a number of services and makes them available on the
+ # network.
+ class RpcServer
+ include Core::CallOps
+ include Core::TimeConsts
+ extend ::Forwardable
+
+ def_delegators :@server, :add_http2_port
+
+ # Default thread pool size is 3
+ DEFAULT_POOL_SIZE = 3
+
+ # Default max_waiting_requests size is 20
+ DEFAULT_MAX_WAITING_REQUESTS = 20
+
+ # Default poll period is 1s
+ DEFAULT_POLL_PERIOD = 1
+
+ # Signal check period is 0.25s
+ SIGNAL_CHECK_PERIOD = 0.25
+
+ # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
+ # its arguments.
+ def self.setup_cq(alt_cq)
+ return Core::CompletionQueue.new if alt_cq.nil?
+ unless alt_cq.is_a? Core::CompletionQueue
+ fail(TypeError, '!CompletionQueue')
+ end
+ alt_cq
+ end
+
+ # setup_srv is used by #initialize to constuct a Core::Server from its
+ # arguments.
+ def self.setup_srv(alt_srv, cq, **kw)
+ return Core::Server.new(cq, kw) if alt_srv.nil?
+ fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server
+ alt_srv
+ end
+
+ # setup_connect_md_proc is used by #initialize to validate the
+ # connect_md_proc.
+ def self.setup_connect_md_proc(a_proc)
+ return nil if a_proc.nil?
+ fail(TypeError, '!Proc') unless a_proc.is_a? Proc
+ a_proc
+ end
+
+ # Creates a new RpcServer.
+ #
+ # The RPC server is configured using keyword arguments.
+ #
+ # There are some specific keyword args used to configure the RpcServer
+ # instance, however other arbitrary are allowed and when present are used
+ # to configure the listeninng connection set up by the RpcServer.
+ #
+ # * server_override: which if passed must be a [GRPC::Core::Server]. When
+ # present.
+ #
+ # * poll_period: when present, the server polls for new events with this
+ # period
+ #
+ # * pool_size: the size of the thread pool the server uses to run its
+ # threads
+ #
+ # * completion_queue_override: when supplied, this will be used as the
+ # completion_queue that the server uses to receive network events,
+ # otherwise its creates a new instance itself
+ #
+ # * creds: [GRPC::Core::ServerCredentials]
+ # the credentials used to secure the server
+ #
+ # * max_waiting_requests: the maximum number of requests that are not
+ # being handled to allow. When this limit is exceeded, the server responds
+ # with not available to new requests
+ #
+ # * connect_md_proc:
+ # when non-nil is a proc for determining metadata to to send back the client
+ # on receiving an invocation req. The proc signature is:
+ # {key: val, ..} func(method_name, {key: val, ...})
+ def initialize(pool_size:DEFAULT_POOL_SIZE,
+ max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
+ poll_period:DEFAULT_POLL_PERIOD,
+ completion_queue_override:nil,
+ server_override:nil,
+ connect_md_proc:nil,
+ **kw)
+ @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
+ @cq = RpcServer.setup_cq(completion_queue_override)
+ @max_waiting_requests = max_waiting_requests
+ @poll_period = poll_period
+ @pool_size = pool_size
+ @pool = Pool.new(@pool_size)
+ @run_cond = ConditionVariable.new
+ @run_mutex = Mutex.new
+ # running_state can take 4 values: :not_started, :running, :stopping, and
+ # :stopped. State transitions can only proceed in that order.
+ @running_state = :not_started
+ @server = RpcServer.setup_srv(server_override, @cq, **kw)
+ end
+
+ # stops a running server
+ #
+ # the call has no impact if the server is already stopped, otherwise
+ # server's current call loop is it's last.
+ def stop
+ @run_mutex.synchronize do
+ fail 'Cannot stop before starting' if @running_state == :not_started
+ return if @running_state != :running
+ transition_running_state(:stopping)
+ end
+ deadline = from_relative_time(@poll_period)
+ @server.close(@cq, deadline)
+ @pool.stop
+ end
+
+ def running_state
+ @run_mutex.synchronize do
+ return @running_state
+ end
+ end
+
+ # Can only be called while holding @run_mutex
+ def transition_running_state(target_state)
+ state_transitions = {
+ not_started: :running,
+ running: :stopping,
+ stopping: :stopped
+ }
+ if state_transitions[@running_state] == target_state
+ @running_state = target_state
+ else
+ fail "Bad server state transition: #{@running_state}->#{target_state}"
+ end
+ end
+
+ def running?
+ running_state == :running
+ end
+
+ def stopped?
+ running_state == :stopped
+ end
+
+ # Is called from other threads to wait for #run to start up the server.
+ #
+ # If run has not been called, this returns immediately.
+ #
+ # @param timeout [Numeric] number of seconds to wait
+ # @result [true, false] true if the server is running, false otherwise
+ def wait_till_running(timeout = nil)
+ @run_mutex.synchronize do
+ @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
+ return @running_state == :running
+ end
+ end
+
+ # Runs the server in its own thread, then waits for signal INT or TERM on
+ # the current thread to terminate it.
+ def run_till_terminated
+ GRPC.trap_signals
+ t = Thread.new { run }
+ wait_till_running
+ loop do
+ sleep SIGNAL_CHECK_PERIOD
+ break unless GRPC.handle_signals
+ end
+ stop
+ t.join
+ end
+
+ # handle registration of classes
+ #
+ # service is either a class that includes GRPC::GenericService and whose
+ # #new function can be called without argument or any instance of such a
+ # class.
+ #
+ # E.g, after
+ #
+ # class Divider
+ # include GRPC::GenericService
+ # rpc :div DivArgs, DivReply # single request, single response
+ # def initialize(optional_arg='default option') # no args
+ # ...
+ # end
+ #
+ # srv = GRPC::RpcServer.new(...)
+ #
+ # # Either of these works
+ #
+ # srv.handle(Divider)
+ #
+ # # or
+ #
+ # srv.handle(Divider.new('replace optional arg'))
+ #
+ # It raises RuntimeError:
+ # - if service is not valid service class or object
+ # - its handler methods are already registered
+ # - if the server is already running
+ #
+ # @param service [Object|Class] a service class or object as described
+ # above
+ def handle(service)
+ @run_mutex.synchronize do
+ unless @running_state == :not_started
+ fail 'cannot add services if the server has been started'
+ end
+ cls = service.is_a?(Class) ? service : service.class
+ assert_valid_service_class(cls)
+ add_rpc_descs_for(service)
+ end
+ end
+
+ # runs the server
+ #
+ # - if no rpc_descs are registered, this exits immediately, otherwise it
+ # continues running permanently and does not return until program exit.
+ #
+ # - #running? returns true after this is called, until #stop cause the
+ # the server to stop.
+ def run
+ @run_mutex.synchronize do
+ fail 'cannot run without registering services' if rpc_descs.size.zero?
+ @pool.start
+ @server.start
+ transition_running_state(:running)
+ @run_cond.broadcast
+ end
+ loop_handle_server_calls
+ end
+
+ # Sends UNAVAILABLE if there are too many unprocessed jobs
+ def available?(an_rpc)
+ jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
+ GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
+ return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
+ GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::UNAVAILABLE, '')
+ nil
+ end
+
+ # Sends UNIMPLEMENTED if the method is not implemented by this server
+ def implemented?(an_rpc)
+ mth = an_rpc.method.to_sym
+ return an_rpc if rpc_descs.key?(mth)
+ GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::UNIMPLEMENTED, '')
+ nil
+ end
+
+ # handles calls to the server
+ def loop_handle_server_calls
+ fail 'not started' if running_state == :not_started
+ loop_tag = Object.new
+ while running_state == :running
+ begin
+ an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
+ break if (!an_rpc.nil?) && an_rpc.call.nil?
+
+ active_call = new_active_server_call(an_rpc)
+ unless active_call.nil?
+ @pool.schedule(active_call) do |ac|
+ c, mth = ac
+ rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
+ end
+ end
+ rescue Core::CallError, RuntimeError => e
+ # these might happen for various reasonse. The correct behaviour of
+ # the server is to log them and continue, if it's not shutting down.
+ if running_state == :running
+ GRPC.logger.warn("server call failed: #{e}")
+ end
+ next
+ end
+ end
+ # @running_state should be :stopping here
+ @run_mutex.synchronize { transition_running_state(:stopped) }
+ GRPC.logger.info("stopped: #{self}")
+ end
+
+ def new_active_server_call(an_rpc)
+ return nil if an_rpc.nil? || an_rpc.call.nil?
+
+ # allow the metadata to be accessed from the call
+ handle_call_tag = Object.new
+ an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
+ GRPC.logger.debug("call md is #{an_rpc.metadata}")
+ connect_md = nil
+ unless @connect_md_proc.nil?
+ connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
+ end
+ an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
+ SEND_INITIAL_METADATA => connect_md)
+ return nil unless available?(an_rpc)
+ return nil unless implemented?(an_rpc)
+
+ # Create the ActiveCall
+ GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
+ rpc_desc = rpc_descs[an_rpc.method.to_sym]
+ c = ActiveCall.new(an_rpc.call, @cq,
+ rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
+ an_rpc.deadline)
+ mth = an_rpc.method.to_sym
+ [c, mth]
+ end
+
+ protected
+
+ def rpc_descs
+ @rpc_descs ||= {}
+ end
+
+ def rpc_handlers
+ @rpc_handlers ||= {}
+ end
+
+ def assert_valid_service_class(cls)
+ unless cls.include?(GenericService)
+ fail "#{cls} must 'include GenericService'"
+ end
+ if cls.rpc_descs.size.zero?
+ fail "#{cls} should specify some rpc descriptions"
+ end
+ cls.assert_rpc_descs_have_methods
+ end
+
+ # This should be called while holding @run_mutex
+ def add_rpc_descs_for(service)
+ cls = service.is_a?(Class) ? service : service.class
+ specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
+ cls.rpc_descs.each_pair do |name, spec|
+ route = "/#{cls.service_name}/#{name}".to_sym
+ fail "already registered: rpc #{route} from #{spec}" if specs.key? route
+ specs[route] = spec
+ rpc_name = GenericService.underscore(name.to_s).to_sym
+ if service.is_a?(Class)
+ handlers[route] = cls.new.method(rpc_name)
+ else
+ handlers[route] = service.method(rpc_name)
+ end
+ GRPC.logger.info("handling #{route} with #{handlers[route]}")
+ end
+ end
+ end
+end
« no previous file with comments | « third_party/grpc/src/ruby/lib/grpc/generic/rpc_desc.rb ('k') | third_party/grpc/src/ruby/lib/grpc/generic/service.rb » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698