# Copyright 2015 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require_relative '../grpc' # GRPC contains the General RPC module. module GRPC # RpcDesc is a Descriptor of an RPC method. class RpcDesc < Struct.new(:name, :input, :output, :marshal_method, :unmarshal_method) include Core::StatusCodes # Used to wrap a message class to indicate that it needs to be streamed. class Stream attr_accessor :type def initialize(type) @type = type end end # @return [Proc] { |instance| marshalled(instance) } def marshal_proc proc { |o| o.class.send(marshal_method, o).to_s } end # @param [:input, :output] target determines whether to produce the an # unmarshal Proc for the rpc input parameter or # its output parameter # # @return [Proc] An unmarshal proc { |marshalled(instance)| instance } def unmarshal_proc(target) fail ArgumentError unless [:input, :output].include?(target) unmarshal_class = send(target) unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream proc { |o| unmarshal_class.send(unmarshal_method, o) } end def handle_request_response(active_call, mth, inter_ctx) req = active_call.read_unary_request call = active_call.single_req_view inter_ctx.intercept!( :request_response, method: mth, call: call, request: req ) do resp = mth.call(req, call) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata ) end end def handle_client_streamer(active_call, mth, inter_ctx) call = active_call.multi_req_view inter_ctx.intercept!( :client_streamer, method: mth, call: call ) do resp = mth.call(call) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata ) end end def handle_server_streamer(active_call, mth, inter_ctx) req = active_call.read_unary_request call = active_call.single_req_view inter_ctx.intercept!( :server_streamer, method: mth, call: call, request: req ) do replies = mth.call(req, call) replies.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) end end ## # @param [GRPC::ActiveCall] active_call # @param [Method] mth # @param [Array] inter_ctx # def handle_bidi_streamer(active_call, mth, inter_ctx) active_call.run_server_bidi(mth, inter_ctx) send_status(active_call, OK, 'OK', active_call.output_metadata) end ## # @param [GRPC::ActiveCall] active_call The current active call object # for the request # @param [Method] mth The current RPC method being called # @param [GRPC::InterceptionContext] inter_ctx The interception context # being executed # def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) # While a server method is running, it might be cancelled, its deadline # might be reached, the handler could throw an unknown error, or a # well-behaved handler could throw a StatusError. if request_response? handle_request_response(active_call, mth, inter_ctx) elsif client_streamer? handle_client_streamer(active_call, mth, inter_ctx) elsif server_streamer? handle_server_streamer(active_call, mth, inter_ctx) else # is a bidi_stream handle_bidi_streamer(active_call, mth, inter_ctx) end rescue BadStatus => e # this is raised by handlers that want GRPC to send an application error # code and detail message and some additional app-specific metadata. GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}") send_status(active_call, e.code, e.details, e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. GRPC.logger.warn("failed call: #{active_call}\n#{e}") rescue Core::OutOfTime # This is raised when active_call#method.call exceeds the deadline # event. Send a status of deadline exceeded GRPC.logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') rescue StandardError, NotImplementedError => e # This will usuaally be an unhandled error in the handling code. # Send back a UNKNOWN status to the client # # Note: this intentionally does not map NotImplementedError to # UNIMPLEMENTED because NotImplementedError is intended for low-level # OS interaction (e.g. syscalls) not supported by the current OS. GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") GRPC.logger.warn(e) send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}") end def assert_arity_matches(mth) # A bidi handler function can optionally be passed a second # call object parameter for access to metadata, cancelling, etc. if bidi_streamer? if mth.arity != 2 && mth.arity != 1 fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \ "#{mth.name}(req)") end elsif request_response? || server_streamer? if mth.arity != 2 fail arity_error(mth, 2, "should be #{mth.name}(req, call)") end else if mth.arity != 1 fail arity_error(mth, 1, "should be #{mth.name}(call)") end end end def request_response? !input.is_a?(Stream) && !output.is_a?(Stream) end def client_streamer? input.is_a?(Stream) && !output.is_a?(Stream) end def server_streamer? !input.is_a?(Stream) && output.is_a?(Stream) end def bidi_streamer? input.is_a?(Stream) && output.is_a?(Stream) end def arity_error(mth, want, msg) "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" end def send_status(active_client, code, details, metadata = {}) details = 'Not sure why' if details.nil? GRPC.logger.debug("Sending status #{code}:#{details}") active_client.send_status(code, details, code == OK, metadata: metadata) rescue StandardError => e GRPC.logger.warn("Could not send status #{code}:#{details}") GRPC.logger.warn(e) end end end