aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/service.rb
blob: 84f1ce75203b30e2bf0d926f4ab2c6fcec98c37c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

require_relative 'client_stub'
require_relative 'rpc_desc'

# GRPC contains the General RPC module.
module GRPC
  # Provides behaviour used to implement schema-derived service classes.
  #
  # Is intended to be used to support both client and server
  # IDL-schema-derived servers.
  module GenericService
    # creates a new string that is the underscore separate version of s.
    #
    # E.g,
    # PrintHTML -> print_html
    # AMethod -> a_method
    # AnRpc -> an_rpc
    #
    # @param s [String] the string to be converted.
    def self.underscore(s)
      s.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
      s.gsub!(/([a-z\d])([A-Z])/, '\1_\2')
      s.tr!('-', '_')
      s.downcase!
      s
    end

    # Used to indicate that a name has already been specified
    class DuplicateRpcName < StandardError
      def initialize(name)
        super("rpc (#{name}) is already defined")
      end
    end

    # Provides a simple DSL to describe RPC services.
    #
    # E.g, a Maths service that uses the serializable messages DivArgs,
    # DivReply and Num might define its endpoint uses the following way:
    #
    # rpc :div DivArgs, DivReply    # single request, single response
    # rpc :sum stream(Num), Num     # streamed input, single response
    # rpc :fib FibArgs, stream(Num) # single request, streamed response
    # rpc :div_many stream(DivArgs), stream(DivReply)
    #                               # streamed req and resp
    #
    # Each 'rpc' adds an RpcDesc to classes including this module, and
    # #assert_rpc_descs_have_methods is used to ensure the including class
    # provides methods with signatures that support all the descriptors.
    module Dsl
      # This configures the method names that the serializable message
      # implementation uses to marshal and unmarshal messages.
      #
      # - unmarshal_class method must be a class method on the serializable
      # message type that takes a string (byte stream) and produces and object
      #
      # - marshal_class_method is called on a serializable message instance
      # and produces a serialized string.
      #
      # The Dsl verifies that the types in the descriptor have both the
      # unmarshal and marshal methods.
      attr_writer(:marshal_class_method, :unmarshal_class_method)

      # This allows configuration of the service name.
      attr_accessor(:service_name)

      # Adds an RPC spec.
      #
      # Takes the RPC name and the classes representing the types to be
      # serialized, and adds them to the including classes rpc_desc hash.
      #
      # input and output should both have the methods #marshal and #unmarshal
      # that are responsible for writing and reading an object instance from a
      # byte buffer respectively.
      #
      # @param name [String] the name of the rpc
      # @param input [Object] the input parameter's class
      # @param output [Object] the output parameter's class
      def rpc(name, input, output)
        fail(DuplicateRpcName, name) if rpc_descs.key? name
        assert_can_marshal(input)
        assert_can_marshal(output)
        rpc_descs[name] = RpcDesc.new(name, input, output,
                                      marshal_class_method,
                                      unmarshal_class_method)
        define_method(GenericService.underscore(name.to_s).to_sym) do
          fail GRPC::BadStatus.new_status_exception(
            GRPC::Core::StatusCodes::UNIMPLEMENTED)
        end
      end

      def inherited(subclass)
        # Each subclass should have a distinct class variable with its own
        # rpc_descs
        subclass.rpc_descs.merge!(rpc_descs)
        subclass.service_name = service_name
      end

      # the name of the instance method used to marshal events to a byte
      # stream.
      def marshal_class_method
        @marshal_class_method ||= :marshal
      end

      # the name of the class method used to unmarshal from a byte stream.
      def unmarshal_class_method
        @unmarshal_class_method ||= :unmarshal
      end

      def assert_can_marshal(cls)
        cls = cls.type if cls.is_a? RpcDesc::Stream
        mth = unmarshal_class_method
        unless cls.methods.include? mth
          fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
        end
        mth = marshal_class_method
        return if cls.methods.include? mth
        fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
      end

      # @param cls [Class] the class of a serializable type
      # @return cls wrapped in a RpcDesc::Stream
      def stream(cls)
        assert_can_marshal(cls)
        RpcDesc::Stream.new(cls)
      end

      # the RpcDescs defined for this GenericService, keyed by name.
      def rpc_descs
        @rpc_descs ||= {}
      end

      # Creates a rpc client class with methods for accessing the methods
      # currently in rpc_descs.
      def rpc_stub_class
        descs = rpc_descs
        route_prefix = service_name
        Class.new(ClientStub) do
          # @param host [String] the host the stub connects to
          # @param creds [Core::ChannelCredentials|Symbol] The channel
          #     credentials to use, or :this_channel_is_insecure otherwise
          # @param kw [KeywordArgs] the channel arguments, plus any optional
          #                         args for configuring the client's channel
          def initialize(host, creds, **kw)
            super(host, creds, **kw)
          end

          # Used define_method to add a method for each rpc_desc.  Each method
          # calls the base class method for the given descriptor.
          descs.each_pair do |name, desc|
            mth_name = GenericService.underscore(name.to_s).to_sym
            marshal = desc.marshal_proc
            unmarshal = desc.unmarshal_proc(:output)
            route = "/#{route_prefix}/#{name}"
            if desc.request_response?
              define_method(mth_name) do |req, metadata = {}|
                GRPC.logger.debug("calling #{@host}:#{route}")
                request_response(route, req, marshal, unmarshal, metadata)
              end
            elsif desc.client_streamer?
              define_method(mth_name) do |reqs, metadata = {}|
                GRPC.logger.debug("calling #{@host}:#{route}")
                client_streamer(route, reqs, marshal, unmarshal, metadata)
              end
            elsif desc.server_streamer?
              define_method(mth_name) do |req, metadata = {}, &blk|
                GRPC.logger.debug("calling #{@host}:#{route}")
                server_streamer(route, req, marshal, unmarshal, metadata, &blk)
              end
            else  # is a bidi_stream
              define_method(mth_name) do |reqs, metadata = {}, &blk|
                GRPC.logger.debug("calling #{@host}:#{route}")
                bidi_streamer(route, reqs, marshal, unmarshal, metadata, &blk)
              end
            end
          end
        end
      end
    end

    def self.included(o)
      o.extend(Dsl)
      # Update to the use the service name including module. Provide a default
      # that can be nil e.g. when modules are declared dynamically.
      return unless o.service_name.nil?
      if o.name.nil?
        o.service_name = 'GenericService'
      else
        modules = o.name.split('::')
        if modules.length > 2
          o.service_name = modules[modules.length - 2]
        else
          o.service_name = modules.first
        end
      end
    end
  end
end