class GRPC::RpcServer
RpcServer
hosts a number of services and makes them available on the network.
Constants
- DEFAULT_MAX_WAITING_REQUESTS
Deprecated due to internal changes to the thread pool
- DEFAULT_POLL_PERIOD
Default poll period is 1s
- DEFAULT_POOL_SIZE
Default thread pool size is 30
- SIGNAL_CHECK_PERIOD
Signal check period is 0.25s
Private Class Methods
Creates a new RpcServer
.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer
instance.
-
pool_size: the size of the thread pool the server uses to run its
threads. No more concurrent requests can be made than the size of the thread pool
-
max_waiting_requests: Deprecated due to internal changes to the thread
pool. This is still an argument for compatibility but is ignored.
-
poll_period: The amount of time in seconds to wait for
currently-serviced RPC's to finish before cancelling them when shutting down the server.
-
pool_keep_alive: The amount of time in seconds to wait
for currently busy thread-pool threads to finish before forcing an abrupt exit to each thread.
-
connect_md_proc:
when non-nil is a proc for determining metadata to send back the client on receiving an invocation req. The proc signature is:
{key: val, ..} func(method_name, {key: val, ...})
-
server_args:
A server arguments hash to be passed down to the underlying core server
-
interceptors:
An array of GRPC::ServerInterceptor
objects that will be used for intercepting server handlers to provide extra functionality. Interceptors are an EXPERIMENTAL API.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 217 def initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE, connect_md_proc: nil, server_args: {}, interceptors: []) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) @interceptors = InterceptorRegistry.new(interceptors) end
setup_connect_md_proc
is used by initialize to validate the connect_md_proc.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 176 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end
Private Instance Methods
This should be called while holding @run_mutex
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 535 def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym fail "already registered: rpc #{route} from #{spec}" if specs.key? route specs[route] = spec rpc_name = GenericService.underscore(name.to_s).to_sym if service.is_a?(Class) handlers[route] = cls.new.method(rpc_name) else handlers[route] = service.method(rpc_name) end GRPC.logger.info("handling #{route} with #{handlers[route]}") end end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 526 def assert_valid_service_class(cls) unless cls.include?(GenericService) fail "#{cls} must 'include GenericService'" end fail "#{cls} should specify some rpc descriptions" if cls.rpc_descs.size.zero? end
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 419 def available?(an_rpc) return an_rpc if @pool.ready_for_work? GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, 'No free threads in thread pool') nil end
handle registration of classes
service is either a class that includes GRPC::GenericService
and whose new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService rpc :div DivArgs, DivReply # single request, single response def initialize(optional_arg='default option') # no args ... end
srv = GRPC::RpcServer.new(...)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new('replace optional arg'))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
@param service [Object|Class] a service class or object as described
above
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 333 def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end
Sends UNIMPLEMENTED if the method is not implemented by this server
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 434 def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } # Create a new active call that knows that # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end
handles calls to the server
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 449 def loop_handle_server_calls fail 'not started' if running_state == :not_started while running_state == :running begin an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method( c, rpc_handlers[mth], @interceptors.build_context ) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') end end end rescue Core::CallError, RuntimeError => e # these might happen for various reasons. The correct behavior of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize do transition_running_state(:stopped) GRPC.logger.info("stopped: #{self}") @server.close end end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 488 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline, metadata_received: true, started: false, metadata_to_send: connect_md) c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 518 def rpc_descs @rpc_descs ||= {} end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 522 def rpc_handlers @rpc_handlers ||= {} end
runs the server
-
if no
rpc_descs
are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit. -
running?
returns true after this is called, untilstop
cause the the server to stop.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 351 def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end
runs the server with signal handlers @param signals
List of String, Integer or both representing signals that the user would like to send to the server for graceful shutdown
@param wait_interval (optional)
Integer seconds that user would like stop_server_thread to poll stop_server
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 371 def run_till_terminated_or_interrupted(signals, wait_interval = 60) @stop_server = false @stop_server_mu = Mutex.new @stop_server_cv = ConditionVariable.new @stop_server_thread = Thread.new do loop do break if @stop_server @stop_server_mu.synchronize do @stop_server_cv.wait(@stop_server_mu, wait_interval) end end # stop is surrounded by mutex, should handle multiple calls to stop # correctly stop end valid_signals = Signal.list # register signal handlers signals.each do |sig| # input validation if sig.class == String sig.upcase! if sig.start_with?('SIG') # cut out the SIG prefix to see if valid signal sig = sig[3..-1] end end # register signal traps for all valid signals if valid_signals.value?(sig) || valid_signals.key?(sig) Signal.trap(sig) do @stop_server = true @stop_server_cv.broadcast end else fail "#{sig} not a valid signal" end end run @stop_server_thread.join end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 280 def running? running_state == :running end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 260 def running_state @run_mutex.synchronize do return @running_state end end
stops a running server
the call has no impact if the server is already stopped, otherwise server's current call loop is it's last.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 242 def stop # if called via run_till_terminated_or_interrupted, # signal stop_server_thread and don't do anything if @stop_server.nil? == false && @stop_server == false @stop_server = true @stop_server_cv.broadcast return end @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) deadline = from_relative_time(@poll_period) @server.shutdown_and_notify(deadline) end @pool.stop end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 284 def stopped? running_state == :stopped end
Can only be called while holding @run_mutex
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 267 def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end
Is called from other threads to wait for run
to start up the server.
If run has not been called, this returns immediately.
@param timeout [Numeric] number of seconds to wait @return [true, false] true if the server is running, false otherwise
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 294 def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end