diff options
author | Sree Kuchibhotla <sreek@google.com> | 2017-07-20 23:49:54 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2017-07-20 23:49:54 -0700 |
commit | 471aa62716063091d48dc08cbe05b7550910da81 (patch) | |
tree | 20c4acf2761ddc8153b924355a93f23f0ef6607d /src | |
parent | 949d075812cb617eaf4d66a880869cb400f4440b (diff) | |
parent | 37e06cfc4d7e6473b000d4b64c7305da8e98e456 (diff) |
Merge branch 'master' into sreek-epoll1
Diffstat (limited to 'src')
26 files changed, 881 insertions, 181 deletions
diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc index 6d34761fdf..38ec46e656 100644 --- a/src/compiler/php_generator.cc +++ b/src/compiler/php_generator.cc @@ -97,13 +97,14 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) { } // Prints out the service descriptor object -void PrintService(const ServiceDescriptor *service, Printer *out) { +void PrintService(const ServiceDescriptor *service, + const grpc::string ¶meter, Printer *out) { map<grpc::string, grpc::string> vars; out->Print("/**\n"); out->Print(GetPHPComments(service, " *").c_str()); out->Print(" */\n"); - vars["name"] = service->name(); - out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n"); + vars["name"] = GetPHPServiceClassname(service, parameter); + out->Print(vars, "class $name$ extends \\Grpc\\BaseStub {\n\n"); out->Indent(); out->Indent(); out->Print( @@ -131,7 +132,8 @@ void PrintService(const ServiceDescriptor *service, Printer *out) { } grpc::string GenerateFile(const FileDescriptor *file, - const ServiceDescriptor *service) { + const ServiceDescriptor *service, + const grpc::string ¶meter) { grpc::string output; { StringOutputStream output_stream(&output); @@ -150,7 +152,7 @@ grpc::string GenerateFile(const FileDescriptor *file, vars["package"] = MessageIdentifierName(file->package()); out.Print(vars, "namespace $package$;\n\n"); - PrintService(service, &out); + PrintService(service, parameter, &out); } return output; } diff --git a/src/compiler/php_generator.h b/src/compiler/php_generator.h index 4518bc24f9..9a04bd33d7 100644 --- a/src/compiler/php_generator.h +++ b/src/compiler/php_generator.h @@ -24,7 +24,8 @@ namespace grpc_php_generator { grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file, - const grpc::protobuf::ServiceDescriptor *service); + const grpc::protobuf::ServiceDescriptor *service, + const grpc::string ¶meter); } // namespace grpc_php_generator diff --git a/src/compiler/php_generator_helpers.h b/src/compiler/php_generator_helpers.h index 3a5c08b3e6..5edebf6290 100644 --- a/src/compiler/php_generator_helpers.h +++ b/src/compiler/php_generator_helpers.h @@ -26,9 +26,22 @@ namespace grpc_php_generator { +inline grpc::string GetPHPServiceClassname( + const grpc::protobuf::ServiceDescriptor *service, + const grpc::string ¶meter) { + grpc::string suffix; + if (parameter == "") { + suffix = "Client"; + } else { + suffix = parameter; + } + return service->name() + suffix; +} + inline grpc::string GetPHPServiceFilename( const grpc::protobuf::FileDescriptor *file, - const grpc::protobuf::ServiceDescriptor *service) { + const grpc::protobuf::ServiceDescriptor *service, + const grpc::string ¶meter) { std::vector<grpc::string> tokens = grpc_generator::tokenize(file->package(), "."); std::ostringstream oss; @@ -36,7 +49,7 @@ inline grpc::string GetPHPServiceFilename( oss << (i == 0 ? "" : "/") << grpc_generator::CapitalizeFirstLetter(tokens[i]); } - return oss.str() + "/" + service->name() + "Client.php"; + return oss.str() + "/" + GetPHPServiceClassname(service, parameter) + ".php"; } // ReplaceAll replaces all instances of search with replace in s. diff --git a/src/compiler/php_plugin.cc b/src/compiler/php_plugin.cc index 7a581fd7bc..bbe91656d5 100644 --- a/src/compiler/php_plugin.cc +++ b/src/compiler/php_plugin.cc @@ -41,10 +41,11 @@ class PHPGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { } for (int i = 0; i < file->service_count(); i++) { - grpc::string code = GenerateFile(file, file->service(i)); + grpc::string code = GenerateFile(file, file->service(i), parameter); // Get output file name - grpc::string file_name = GetPHPServiceFilename(file, file->service(i)); + grpc::string file_name = + GetPHPServiceFilename(file, file->service(i), parameter); std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output( context->Open(file_name)); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index cccc3e871a..fdb18f687f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -1705,7 +1705,6 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { glb_lb_policy *glb_policy = (glb_lb_policy *)policy; - if (glb_policy->updating_lb_channel) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, @@ -1813,9 +1812,11 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, // lb_on_server_status_received will pick up the cancel and reinit // lb_call. if (glb_policy->pending_update_args != NULL) { - const grpc_lb_policy_args *args = glb_policy->pending_update_args; + grpc_lb_policy_args *args = glb_policy->pending_update_args; glb_policy->pending_update_args = NULL; glb_update_locked(exec_ctx, &glb_policy->base, args); + grpc_channel_args_destroy(exec_ctx, args->args); + gpr_free(args); } } else if (glb_policy->started_picking && !glb_policy->shutting_down) { if (glb_policy->retry_timer_active) { diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 4df64d81e2..14498021eb 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -190,8 +190,11 @@ typedef struct inproc_stream { static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs, size_t max, grpc_closure *on_complete) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - return (stream->le->sb.count != 0); + // Because inproc transport always provides the entire message atomically, + // the byte stream always has data available when this function is called. + // Thus, this function always returns true (unlike other transports) and + // there is never any need to schedule a closure + return true; } static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index 99dc2f1c78..3f4145d104 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -220,6 +220,11 @@ const char *grpc_sockaddr_get_uri_scheme( return NULL; } +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; + return addr->sa_family; +} + int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 7692b969f2..a589a19705 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -75,4 +75,6 @@ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr); /* Returns the URI scheme corresponding to \a addr */ const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34..079c913579 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -316,6 +316,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, unsigned port_index = 0; int status; grpc_error *error = GRPC_ERROR_NONE; + int family; if (s->tail != NULL) { port_index = s->tail->port_index + 1; @@ -353,7 +354,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, } handle = gpr_malloc(sizeof(uv_tcp_t)); - status = uv_tcp_init(uv_default_loop(), handle); + + family = grpc_sockaddr_get_family(addr); + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t *)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); } else { diff --git a/src/core/lib/support/env.h b/src/core/lib/support/env.h index 18bc08ac62..e2c012a728 100644 --- a/src/core/lib/support/env.h +++ b/src/core/lib/support/env.h @@ -36,6 +36,12 @@ char *gpr_getenv(const char *name); /* Sets the the environment with the specified name to the specified value. */ void gpr_setenv(const char *name, const char *value); +/* This is a version of gpr_getenv that does not produce any output if it has to + use an insecure version of the function. It is ONLY to be used to solve the + problem in which we need to check an env variable to configure the verbosity + level of logging. So DO NOT USE THIS. */ +const char *gpr_getenv_silent(const char *name, char **dst); + #ifdef __cplusplus } #endif diff --git a/src/core/lib/support/env_linux.c b/src/core/lib/support/env_linux.c index 0c79a2c401..4c45a977ca 100644 --- a/src/core/lib/support/env_linux.c +++ b/src/core/lib/support/env_linux.c @@ -38,7 +38,9 @@ #include "src/core/lib/support/string.h" -char *gpr_getenv(const char *name) { +const char *gpr_getenv_silent(const char *name, char **dst) { + const char *insecure_func_used = NULL; + char *result = NULL; #if defined(GPR_BACKWARDS_COMPATIBILITY_MODE) typedef char *(*getenv_type)(const char *); static getenv_type getenv_func = NULL; @@ -48,22 +50,28 @@ char *gpr_getenv(const char *name) { for (size_t i = 0; getenv_func == NULL && i < GPR_ARRAY_SIZE(names); i++) { getenv_func = (getenv_type)dlsym(RTLD_DEFAULT, names[i]); if (getenv_func != NULL && strstr(names[i], "secure") == NULL) { - gpr_log(GPR_DEBUG, - "Warning: insecure environment read function '%s' used", - names[i]); + insecure_func_used = names[i]; } } - char *result = getenv_func(name); - return result == NULL ? result : gpr_strdup(result); + result = getenv_func(name); #elif __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 17) - char *result = secure_getenv(name); - return result == NULL ? result : gpr_strdup(result); + result = secure_getenv(name); #else - gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", - "getenv"); - char *result = getenv(name); - return result == NULL ? result : gpr_strdup(result); + result = getenv(name); + insecure_func_used = "getenv"; #endif + *dst = result == NULL ? result : gpr_strdup(result); + return insecure_func_used; +} + +char *gpr_getenv(const char *name) { + char *result = NULL; + const char *insecure_func_used = gpr_getenv_silent(name, &result); + if (insecure_func_used != NULL) { + gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", + insecure_func_used); + } + return result; } void gpr_setenv(const char *name, const char *value) { diff --git a/src/core/lib/support/env_posix.c b/src/core/lib/support/env_posix.c index bdbc4da95a..b88822ca02 100644 --- a/src/core/lib/support/env_posix.c +++ b/src/core/lib/support/env_posix.c @@ -29,6 +29,11 @@ #include <grpc/support/string_util.h> #include "src/core/lib/support/string.h" +const char *gpr_getenv_silent(const char *name, char **dst) { + *dst = gpr_getenv(name); + return NULL; +} + char *gpr_getenv(const char *name) { char *result = getenv(name); return result == NULL ? result : gpr_strdup(result); diff --git a/src/core/lib/support/env_windows.c b/src/core/lib/support/env_windows.c index c1d557e219..652eeb61c6 100644 --- a/src/core/lib/support/env_windows.c +++ b/src/core/lib/support/env_windows.c @@ -30,6 +30,11 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +const char *gpr_getenv_silent(const char *name, char **dst) { + *dst = gpr_getenv(name); + return NULL; +} + char *gpr_getenv(const char *name) { char *result = NULL; DWORD size; diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c index bcc336b8ae..fadb4d9a2c 100644 --- a/src/core/lib/support/log.c +++ b/src/core/lib/support/log.c @@ -64,7 +64,8 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) { } void gpr_log_verbosity_init() { - char *verbosity = gpr_getenv("GRPC_VERBOSITY"); + char *verbosity = NULL; + const char *insecure_getenv = gpr_getenv_silent("GRPC_VERBOSITY", &verbosity); gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR; if (verbosity != NULL) { @@ -81,6 +82,11 @@ void gpr_log_verbosity_init() { GPR_LOG_VERBOSITY_UNSET) { gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print); } + + if (insecure_getenv != NULL) { + gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", + insecure_getenv); + } } void gpr_set_log_function(gpr_log_func f) { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index c90f96c0b7..200e477822 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -250,14 +250,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { has_sync_methods && num_frequently_polled_cqs > 0; if (has_sync_methods) { - // This is a Sync server - gpr_log(GPR_INFO, - "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " - "%d, CQ timeout (msec): %d", - sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, - sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec); - grpc_cq_polling_type polling_type = is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING; @@ -272,6 +264,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec)); + if (has_sync_methods) { + // This is a Sync server + gpr_log(GPR_INFO, + "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: " + "%d, CQ timeout (msec): %d", + sync_server_settings_.num_cqs, sync_server_settings_.min_pollers, + sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec); + } + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 67c984ab49..87b29c26ea 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -40,13 +40,13 @@ end module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call - class ActiveCall + class ActiveCall # rubocop:disable Metrics/ClassLength include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader :deadline, :metadata_sent, :metadata_to_send + attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata + :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -100,6 +100,18 @@ module GRPC fail(ArgumentError, 'Already sent md') if started && metadata_to_send @metadata_to_send = metadata_to_send || {} unless started @send_initial_md_mutex = Mutex.new + + @output_stream_done = false + @input_stream_done = false + @call_finished = false + @call_finished_mu = Mutex.new + + @client_call_executed = false + @client_call_executed_mu = Mutex.new + + # set the peer now so that the accessor can still function + # after the server closes the call + @peer = call.peer end # Sends the initial metadata that has yet to be sent. @@ -142,11 +154,9 @@ module GRPC Operation.new(self) end - # finished waits until a client call is completed. - # - # It blocks until the remote endpoint acknowledges by sending a status. - def finished + def receive_and_check_status batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) + set_input_stream_done attach_status_results_and_complete_call(batch_result) end @@ -155,8 +165,6 @@ module GRPC @call.trailing_metadata = recv_status_batch_result.status.metadata end @call.status = recv_status_batch_result.status - @call.close - op_is_done # The RECV_STATUS in run_batch always succeeds # Check the status for a bad status or failed run batch @@ -193,9 +201,19 @@ module GRPC } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(ops) + set_output_stream_done + nil end + # Intended for use on server-side calls when a single request from + # the client is expected (i.e., unary and server-streaming RPC types). + def read_unary_request + req = remote_read + set_input_stream_done + req + end + def server_unary_response(req, trailing_metadata: {}, code: Core::StatusCodes::OK, details: 'OK') ops = {} @@ -211,6 +229,7 @@ module GRPC ops[RECV_CLOSE_ON_SERVER] = nil @call.run_batch(ops) + set_output_stream_done end # remote_read reads a response from the remote endpoint. @@ -241,6 +260,8 @@ module GRPC # each_remote_read passes each response to the given block or returns an # enumerator the responses if no block is given. + # Used to generate the request enumerable for + # server-side client-streaming RPC's. # # == Enumerator == # @@ -258,10 +279,14 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read return enum_for(:each_remote_read) unless block_given? - loop do - resp = remote_read - break if resp.nil? # the last response was received - yield resp + begin + loop do + resp = remote_read + break if resp.nil? # the last response was received + yield resp + end + ensure + set_input_stream_done end end @@ -287,13 +312,17 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - loop do - resp = remote_read - if resp.nil? # the last response was received, but not finished yet - finished - break + begin + loop do + resp = remote_read + if resp.nil? # the last response was received + receive_and_check_status + break + end + yield resp end - yield resp + ensure + set_input_stream_done end end @@ -305,6 +334,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil, @@ -319,7 +349,15 @@ module GRPC end @metadata_sent = true end - batch_result = @call.run_batch(ops) + + begin + batch_result = @call.run_batch(ops) + # no need to check for cancellation after a CallError because this + # batch contains a RECV_STATUS op + ensure + set_input_stream_done + set_output_stream_done + end @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) @@ -339,10 +377,20 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - # Metadata might have already been sent if this is an operation view - merge_metadata_and_send_if_not_already_sent(metadata) + raise_error_if_already_executed + begin + merge_metadata_and_send_if_not_already_sent(metadata) + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + rescue GRPC::Core::CallError => e + receive_and_check_status # check for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end - requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } batch_result = @call.run_batch( SEND_CLOSE_FROM_CLIENT => nil, RECV_INITIAL_METADATA => nil, @@ -350,12 +398,11 @@ module GRPC RECV_STATUS_ON_CLIENT => nil ) + set_input_stream_done + @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) get_message_from_batch_result(batch_result) - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -373,6 +420,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil @@ -384,13 +432,22 @@ module GRPC end @metadata_sent = true end - @call.run_batch(ops) + + begin + @call.run_batch(ops) + rescue GRPC::Core::CallError => e + receive_and_check_status # checks for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end + replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -421,6 +478,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) + raise_error_if_already_executed # Metadata might have already been sent if this is an operation view merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @@ -428,7 +486,10 @@ module GRPC @unmarshal, metadata_received: @metadata_received) - bd.run_on_client(requests, @op_notifier, &blk) + bd.run_on_client(requests, + proc { set_input_stream_done }, + proc { set_output_stream_done }, + &blk) end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -449,7 +510,7 @@ module GRPC metadata_received: @metadata_received, req_view: MultiReqView.new(self)) - bd.run_on_server(gen_each_reply) + bd.run_on_server(gen_each_reply, proc { set_input_stream_done }) end # Waits till an operation completes @@ -459,7 +520,8 @@ module GRPC @op_notifier.wait end - # Signals that an operation is done + # Signals that an operation is done. + # Only relevant on the client-side (this is a no-op on the server-side) def op_is_done return if @op_notifier.nil? @op_notifier.notify(self) @@ -484,8 +546,40 @@ module GRPC end end + def attach_peer_cert(peer_cert) + @peer_cert = peer_cert + end + private + # To be called once the "input stream" has been completelly + # read through (i.e, done reading from client or received status) + # note this is idempotent + def set_input_stream_done + @call_finished_mu.synchronize do + @input_stream_done = true + maybe_finish_and_close_call_locked + end + end + + # To be called once the "output stream" has been completelly + # sent through (i.e, done sending from client or sent status) + # note this is idempotent + def set_output_stream_done + @call_finished_mu.synchronize do + @output_stream_done = true + maybe_finish_and_close_call_locked + end + end + + def maybe_finish_and_close_call_locked + return unless @output_stream_done && @input_stream_done + return if @call_finished + @call_finished = true + op_is_done + @call.close + end + # Starts the call if not already started # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent @@ -493,6 +587,15 @@ module GRPC merge_metadata_to_send(metadata) && send_initial_metadata end + def raise_error_if_already_executed + @client_call_executed_mu.synchronize do + if @client_call_executed + fail GRPC::Core::CallError, 'attempting to re-run a call' + end + @client_call_executed = true + end + end + def self.view_class(*visible_methods) Class.new do extend ::Forwardable @@ -518,6 +621,7 @@ module GRPC # server client_streamer handlers. MultiReqView = view_class(:cancelled?, :deadline, :each_remote_read, :metadata, :output_metadata, + :peer, :peer_cert, :send_initial_metadata, :metadata_to_send, :merge_metadata_to_send, diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index e54cf78969..9e125cd986 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -62,12 +62,19 @@ module GRPC # block that can be invoked with each response. # # @param requests the Enumerable of requests to send - # @param op_notifier a Notifier used to signal completion + # @param set_input_stream_done [Proc] called back when we're done + # reading the input stream + # @param set_input_stream_done [Proc] called back when we're done + # sending data on the output stream # @return an Enumerator of requests to yield - def run_on_client(requests, op_notifier, &blk) - @op_notifier = op_notifier - @enq_th = Thread.new { write_loop(requests) } - read_loop(&blk) + def run_on_client(requests, + set_input_stream_done, + set_output_stream_done, + &blk) + @enq_th = Thread.new do + write_loop(requests, set_output_stream_done: set_output_stream_done) + end + read_loop(set_input_stream_done, &blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -81,12 +88,17 @@ module GRPC # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. - def run_on_server(gen_each_reply) + # @param set_input_steam_done [Proc] call back to call when + # the reads have been completely read through. + def run_on_server(gen_each_reply, set_input_stream_done) # Pass in the optional call object parameter if possible if gen_each_reply.arity == 1 - replys = gen_each_reply.call(read_loop(is_client: false)) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false)) elsif gen_each_reply.arity == 2 - replys = gen_each_reply.call(read_loop(is_client: false), @req_view) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false), + @req_view) else fail 'Illegal arity of reply generator' end @@ -99,22 +111,6 @@ module GRPC END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes - # signals that bidi operation is complete - def notify_done - return unless @op_notifier - GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") - @op_notifier.notify(self) - end - - # signals that a bidi operation is complete (read + write) - def finished - @done_mutex.synchronize do - return unless @reads_complete && @writes_complete && !@complete - @call.close - @complete = true - end - end - # performs a read using @call.run_batch, ensures metadata is set up def read_using_run_batch ops = { RECV_MESSAGE => nil } @@ -127,7 +123,8 @@ module GRPC batch_result end - def write_loop(requests, is_client: true) + # set_output_stream_done is relevant on client-side + def write_loop(requests, is_client: true, set_output_stream_done: nil) GRPC.logger.debug('bidi-write-loop: starting') count = 0 requests.each do |req| @@ -151,23 +148,20 @@ module GRPC GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done') - notify_done - @writes_complete = true - finished end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) - notify_done - @writes_complete = true - finished raise e + ensure + set_output_stream_done.call if is_client end # Provides an enumerator that yields results of remote reads - def read_loop(is_client: true) + def read_loop(set_input_stream_done, is_client: true) return enum_for(:read_loop, + set_input_stream_done, is_client: is_client) unless block_given? GRPC.logger.debug('bidi-read-loop: starting') begin @@ -201,10 +195,10 @@ module GRPC GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) raise e + ensure + set_input_stream_done.call end GRPC.logger.debug('bidi-read-loop: finished') - @reads_complete = true - finished # Make sure that the write loop is done done before finishing the call. # Note that blocking is ok at this point because we've already received # a status diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index ce0097573a..89cf8ff6a0 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -48,7 +48,7 @@ module GRPC end def handle_request_response(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request resp = mth.call(req, active_call.single_req_view) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata) @@ -61,7 +61,7 @@ module GRPC end def handle_server_streamer(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request replys = mth.call(req, active_call.single_req_view) replys.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2cc0ce91..33b3cea1fc 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -418,6 +418,7 @@ module GRPC metadata_received: true, started: false, metadata_to_send: connect_md) + c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb new file mode 100644 index 0000000000..79c9192aa5 --- /dev/null +++ b/src/ruby/spec/client_auth_spec.rb @@ -0,0 +1,137 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'grpc' + +def create_channel_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + files = ['ca.pem', 'client.key', 'client.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2]) +end + +def client_cert + test_root = File.join(File.dirname(__FILE__), 'testdata') + cert = File.open(File.join(test_root, 'client.pem')).read + fail unless cert.is_a?(String) + cert +end + +def create_server_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + p "test root: #{test_root}" + files = ['ca.pem', 'server1.key', 'server1.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ServerCredentials.new( + creds[0], + [{ private_key: creds[1], cert_chain: creds[2] }], + true) # force client auth +end + +# A test message +class EchoMsg + def self.marshal(_o) + '' + end + + def self.unmarshal(_o) + EchoMsg.new + end +end + +# a test service that checks the cert of its peer +class SslTestService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + + def check_peer_cert(call) + error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}" + fail(error_msg) unless call.peer_cert == client_cert + end + + def an_rpc(req, call) + check_peer_cert(call) + req + end + + def a_client_streaming_rpc(call) + check_peer_cert(call) + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + check_peer_cert(call) + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + check_peer_cert(call) + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +SslTestServiceStub = SslTestService.rpc_stub_class + +describe 'client-server auth' do + RpcServer = GRPC::RpcServer + + before(:all) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + port = @srv.add_http2_port('0.0.0.0:0', create_server_creds) + @srv.handle(SslTestService) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + + client_opts = { + channel_args: { + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' + } + } + @stub = SslTestServiceStub.new("localhost:#{port}", + create_channel_creds, + **client_opts) + end + + after(:all) do + expect(@srv.stopped?).to be(false) + @srv.stop + @srv_thd.join + end + + it 'client-server auth with unary RPCs' do + @stub.an_rpc(EchoMsg.new) + end + + it 'client-server auth with client streaming RPCs' do + @stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new]) + end + + it 'client-server auth with server streaming RPCs' do + responses = @stub.a_server_streaming_rpc(EchoMsg.new) + responses.each { |r| p r } + end + + it 'client-server auth with bidi RPCs' do + responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new]) + responses.each { |r| p r } + end +end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 72e55ebcce..ec0c294174 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -473,7 +473,7 @@ describe GRPC::ActiveCall do server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') server_call.send_status(OK, 'status code is OK') - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if the server sends an early status response' do @@ -490,7 +490,7 @@ describe GRPC::ActiveCall do expect do call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) end.to_not raise_error - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 09b88c7cef..a8653e73cf 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -36,6 +36,53 @@ include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts include GRPC::Core::CallOps +# check that methods on a finished/closed call t crash +def check_op_view_of_finished_client_call(op_view, + expected_metadata, + expected_trailing_metadata) + # use read_response_stream to try to iterate through + # possible response stream + fail('need something to attempt reads') unless block_given? + expect do + resp = op_view.execute + yield resp + end.to raise_error(GRPC::Core::CallError) + + expect { op_view.start_call }.to raise_error(RuntimeError) + + sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + + expect do + op_view.wait + op_view.cancel + op_view.write_flag = 1 + end.to_not raise_error +end + +def sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + expected_status = Struct::Status.new + expected_status.code = 0 + expected_status.details = 'OK' + expected_status.metadata = expected_trailing_metadata + + expect(op_view.status).to eq(expected_status) + expect(op_view.metadata).to eq(expected_metadata) + expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) + + expect(op_view.cancelled?).to be(false) + expect(op_view.write_flag).to be(nil) + + # The deadline attribute of a call can be either + # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. + # TODO: fix so that the accessor always returns the same type. + expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || + op_view.deadline.is_a?(Time)).to be(true) +end + describe 'ClientStub' do let(:noop) { proc { |x| x } } @@ -45,6 +92,7 @@ describe 'ClientStub' do @method = 'an_rpc_method' @pass = OK @fail = INTERNAL + @metadata = { k1: 'v1', k2: 'v2' } end after(:each) do @@ -107,7 +155,7 @@ describe 'ClientStub' do end end - describe '#request_response' do + describe '#request_response', request_response: true do before(:each) do @sent_msg, @resp = 'a_msg', 'a_reply' end @@ -126,7 +174,7 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect(get_response(stub)).to eq(@resp) th.join @@ -187,13 +235,24 @@ describe 'ClientStub' do # Kill the server thread so tests can complete th.kill end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_response(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end end describe 'without a call operation' do def get_response(stub, credentials: nil) puts credentials.inspect stub.request_response(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }, + metadata: @metadata, credentials: credentials) end @@ -201,40 +260,62 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false, credentials: nil) - op = stub.request_response(@method, @sent_msg, noop, noop, - return_op: true, - metadata: { k1: 'v1', k2: 'v2' }, - deadline: from_relative_time(2), - credentials: credentials) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.request_response(@method, @sent_msg, noop, noop, + return_op: true, + metadata: @metadata, + deadline: from_relative_time(2), + credentials: credentials) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end it_behaves_like 'request response' - it 'sends metadata to the server ok when running start_call first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_request_response( + @sent_msg, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - expect(get_response(stub)).to eq(@resp) + expect( + get_response(stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end end end - describe '#client_streamer' do + describe '#client_streamer', client_streamer: true do before(:each) do Thread.abort_on_exception = true server_port = create_test_server host = "localhost:#{server_port}" @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - @metadata = { k1: 'v1', k2: 'v2' } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @resp = 'a_reply' end @@ -247,7 +328,8 @@ describe 'ClientStub' do end it 'should send metadata to the server ok' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + th = run_client_streamer(@sent_msgs, @resp, @pass, + expected_metadata: @metadata) expect(get_response(@stub)).to eq(@resp) th.join end @@ -278,27 +360,50 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false) - op = stub.client_streamer(@method, @sent_msgs, noop, noop, - return_op: true, metadata: @metadata) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.client_streamer(@method, @sent_msgs, noop, noop, + return_op: true, metadata: @metadata) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end it_behaves_like 'client streaming' - it 'sends metadata to the server ok when running start_call first' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) - expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) + def run_op_view_metadata_test(run_start_call_first) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_client_streamer( + @sent_msgs, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) + expect( + get_response(@stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end end end - describe '#server_streamer' do + describe '#server_streamer', server_streamer: true do before(:each) do @sent_msg = 'a_msg' @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -328,18 +433,42 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_responses(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end + + it 'the call terminates when there is an unmarshalling error' do + server_port = create_test_server + host = "localhost:#{server_port}" + th = run_server_streamer(@sent_msg, @replys, @pass) + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + + unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } + expect do + get_responses(stub, unmarshal: unmarshal).collect { |r| r } + end.to raise_error(ArgumentError, 'test unmarshalling error') + th.join + end end describe 'without a call operation' do - def get_responses(stub) - e = stub.server_streamer(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }) + def get_responses(stub, unmarshal: noop) + e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -351,10 +480,10 @@ describe 'ClientStub' do after(:each) do @op.wait # make sure wait doesn't hang end - def get_responses(stub, run_start_call_first: false) - @op = stub.server_streamer(@method, @sent_msg, noop, noop, + def get_responses(stub, run_start_call_first: false, unmarshal: noop) + @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, return_op: true, - metadata: { k1: 'v1', k2: 'v2' }) + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -364,20 +493,41 @@ describe 'ClientStub' do it_behaves_like 'server streaming' - it 'should send metadata to the server ok when start_call is run first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_server_streamer( + @sent_msg, @replys, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) - expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + e = get_responses(stub, run_start_call_first: run_start_call_first) + expect(e.collect { |r| r }).to eq(@replys) th.join end + + it 'should send metadata to the server ok when start_call is run first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end end end - describe '#bidi_streamer' do + describe '#bidi_streamer', bidi: true do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -386,7 +536,7 @@ describe 'ClientStub' do end shared_examples 'bidi streaming' do - it 'supports sending all the requests first', bidi: true do + it 'supports sending all the requests first' do th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) @@ -395,7 +545,7 @@ describe 'ClientStub' do th.join end - it 'supports client-initiated ping pong', bidi: true do + it 'supports client-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) @@ -403,18 +553,39 @@ describe 'ClientStub' do th.join end - it 'supports a server-initiated ping pong', bidi: true do + it 'supports a server-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'should raise an error if the status is not ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + th.join + end + + # TODO: add test for metadata-related ArgumentError in a bidi call once + # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed + + it 'should send metadata to the server ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, + expected_metadata: @metadata) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end end describe 'without a call operation' do def get_responses(stub) - e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -428,7 +599,8 @@ describe 'ClientStub' do end def get_responses(stub, run_start_call_first: false) @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, - return_op: true) + return_op: true, + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -438,27 +610,53 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' - it 'can run start_call before executing the call' do - th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, - @pass) + def run_op_view_metadata_test(run_start_call_first) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_bidi_streamer_echo_ping_pong( + @sent_msgs, @pass, true, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) - expect(e.collect { |r| r }).to eq(@replys) + e = get_responses(stub, run_start_call_first: run_start_call_first) + expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'can run start_call before executing the call' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end + + it 'doesnt crash when op_view used after call has finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end end end - def run_server_streamer(expected_input, replys, status, **kw) - wanted_metadata = kw.clone + def run_server_streamer(expected_input, replys, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -472,9 +670,17 @@ describe 'ClientStub' do end end - def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) + def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) @@ -484,33 +690,44 @@ describe 'ClientStub' do expect(c.remote_read).to eq(i) end end - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_client_streamer(expected_inputs, resp, status, **kw) - wanted_metadata = kw.clone + def run_client_streamer(expected_inputs, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_request_response(expected_input, resp, status, **kw) - wanted_metadata = kw.clone + def run_request_response(expected_input, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -528,13 +745,13 @@ describe 'ClientStub' do @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end - def expect_server_to_be_invoked(notifier) + def expect_server_to_be_invoked(notifier, metadata_to_send: nil) @server.start notifier.notify(nil) recvd_rpc = @server.request_call recvd_call = recvd_rpc.call recvd_call.metadata = recvd_rpc.metadata - recvd_call.run_batch(SEND_INITIAL_METADATA => nil) + recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true) end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 100e9e8487..be578c40d3 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -38,14 +38,14 @@ describe GRPC::RpcDesc do shared_examples 'it handles errors' do it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, metadata: {}) this_desc.run_server_method(@call, method(:bad_status)) end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(UNKNOWN, arg_error_msg, false, metadata: {}) @@ -53,7 +53,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) + expect(@call).to receive(:read_unary_request).once.and_raise(CallError) blk = proc do this_desc.run_server_method(@call, method(:fake_reqresp)) end @@ -75,7 +75,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:output_metadata).once.and_return(fake_md) expect(@call).to receive(:server_unary_response).once .with(@ok_response, trailing_metadata: fake_md) @@ -133,7 +133,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) expect(@call).to receive(:output_metadata).and_return(fake_md) expect(@call).to receive(:send_status).once.with(OK, 'OK', true, diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 9633a828a2..e0646f4599 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -111,6 +111,47 @@ end SlowStub = SlowService.rpc_stub_class +# a test service that hangs onto call objects +# and uses them after the server-side call has been +# finished +class CheckCallAfterFinishedService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + attr_reader :server_side_call + + def an_rpc(req, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + req + end + + def a_client_streaming_rpc(call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + # iterate through requests so call can complete + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class + describe GRPC::RpcServer do RpcServer = GRPC::RpcServer StatusCodes = GRPC::Core::StatusCodes @@ -505,5 +546,109 @@ describe GRPC::RpcServer do t.join end end + + context 'when call objects are used after calls have completed' do + before(:each) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) + @alt_host = "0.0.0.0:#{alt_port}" + + @service = CheckCallAfterFinishedService.new + @srv.handle(@service) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + end + + # check that the server-side call is still in a usable state even + # after it has finished + def check_single_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect(call.peer).to be_a(String) + expect(call.peer_cert).to be(nil) + end + + def check_multi_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect do + call.each_remote_read.each { |r| p r } + end.to raise_error(GRPC::Core::CallError) + end + + def common_check_of_finished_server_call(call) + expect do + call.merge_metadata_to_send({}) + end.to raise_error(RuntimeError) + + expect do + call.send_initial_metadata + end.to_not raise_error + + expect(call.cancelled?).to be(false) + expect(call.metadata).to be_a(Hash) + expect(call.metadata['user-agent']).to be_a(String) + + expect(call.metadata_sent).to be(true) + expect(call.output_metadata).to eq({}) + expect(call.metadata_to_send).to eq({}) + expect(call.deadline.is_a?(Time)).to be(true) + end + + it 'should not crash when call used after an unary call is finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.an_rpc(req) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after client streaming finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.a_client_streaming_rpc(requests) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after server streaming finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_server_streaming_rpc(req) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after a bidi call is finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_bidi_rpc(requests) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + end end end diff --git a/src/ruby/spec/testdata/client.key b/src/ruby/spec/testdata/client.key new file mode 100644 index 0000000000..f48d0735d9 --- /dev/null +++ b/src/ruby/spec/testdata/client.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM +s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM +JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT +NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS +k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH +0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS +W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI +w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5 +0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5 +/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/ +U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP +1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd +9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI +JiqOszq9GWESErAatg== +-----END PRIVATE KEY----- diff --git a/src/ruby/spec/testdata/client.pem b/src/ruby/spec/testdata/client.pem new file mode 100644 index 0000000000..e332091019 --- /dev/null +++ b/src/ruby/spec/testdata/client.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV +BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0 +ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw +MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB +nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX +b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz +W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw +R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/ +T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk +tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C +OO+svdkmqH0KZo320ZUqdl2ooQ== +-----END CERTIFICATE----- |