diff options
author | David Garcia Quintas <dgq@google.com> | 2018-03-23 22:39:35 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2018-03-23 22:39:35 -0700 |
commit | 811169d62f5a79a58a218dc992064205032aff83 (patch) | |
tree | 6b522d8ba2965377394f56616d381f4d2bf8a2af /src | |
parent | 02879244669f40bef06b38e4b6ad8a320529c924 (diff) | |
parent | cd0723291760ee7221279214ecf15613b71a1ddc (diff) |
Merge branch 'master' of github.com:grpc/grpc into authority_header
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 82 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/resolver.h | 4 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc | 30 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h | 5 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 4 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.cc | 4 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/writing.cc | 14 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 57 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Channel.cs | 13 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi | 5 | ||||
-rw-r--r-- | src/ruby/ext/grpc/extconf.rb | 1 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_compression_options.c | 2 |
12 files changed, 149 insertions, 72 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index bf3911e5ee..67dd3a1fa7 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -303,11 +303,16 @@ static void request_reresolution_locked(void* arg, grpc_error* error) { chand->lb_policy->SetReresolutionClosureLocked(&args->closure); } +// TODO(roth): The logic in this function is very hard to follow. We +// should refactor this so that it's easier to understand, perhaps as +// part of changing the resolver API to more clearly differentiate +// between transient failures and shutdown. static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(arg); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, - grpc_error_string(error)); + gpr_log(GPR_DEBUG, + "chand=%p: got resolver result: resolver_result=%p error=%s", chand, + chand->resolver_result, grpc_error_string(error)); } // Extract the following fields from the resolver result, if non-nullptr. bool lb_policy_updated = false; @@ -423,8 +428,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } } } - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, @@ -497,6 +500,8 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { "Channel disconnected", &error, 1)); GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); + grpc_channel_args_destroy(chand->resolver_result); + chand->resolver_result = nullptr; } else { // Not shutting down. grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_error* state_error = @@ -515,11 +520,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { chand->exit_idle_when_lb_policy_arrives = false; } watch_lb_policy_locked(chand, chand->lb_policy.get(), state); + } else if (chand->resolver_result == nullptr) { + // Transient failure. + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } if (!lb_policy_updated) { set_channel_connectivity_state_locked( chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); } + grpc_channel_args_destroy(chand->resolver_result); + chand->resolver_result = nullptr; chand->resolver->NextLocked(&chand->resolver_result, &chand->on_resolver_result_changed); GRPC_ERROR_UNREF(state_error); @@ -2753,7 +2763,45 @@ static void pick_after_resolver_result_done_locked(void* arg, chand, calld); } async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (chand->lb_policy != nullptr) { + } else if (chand->resolver == nullptr) { + // Shutting down. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + async_pick_done_locked( + elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else if (chand->lb_policy == nullptr) { + // Transient resolver failure. + // If call has wait_for_ready=true, try again; otherwise, fail. + uint32_t send_initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=true; trying again", + chand, calld); + } + pick_after_resolver_result_start_locked(elem); + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=false; failing", + chand, calld); + } + async_pick_done_locked( + elem, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } + } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); @@ -2767,30 +2815,6 @@ static void pick_after_resolver_result_done_locked(void* arg, async_pick_done_locked(elem, GRPC_ERROR_NONE); } } - // TODO(roth): It should be impossible for chand->lb_policy to be nullptr - // here, so the rest of this code should never actually be executed. - // However, we have reports of a crash on iOS that triggers this case, - // so we are temporarily adding this to restore branches that were - // removed in https://github.com/grpc/grpc/pull/12297. Need to figure - // out what is actually causing this to occur and then figure out the - // right way to deal with it. - else if (chand->resolver != nullptr) { - // No LB policy, so try again. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: resolver returned but no LB policy, " - "trying again", - chand, calld); - } - pick_after_resolver_result_start_locked(elem); - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, - calld); - } - async_pick_done_locked( - elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } } static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index 1685a6c803..cdb5a20ea3 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -53,8 +53,12 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> { /// Requests a callback when a new result becomes available. /// When the new result is available, sets \a *result to the new result /// and schedules \a on_complete for execution. + /// Upon transient failure, sets \a *result to nullptr and schedules + /// \a on_complete with no error. /// If resolution is fatally broken, sets \a *result to nullptr and /// schedules \a on_complete with an error. + /// TODO(roth): When we have time, improve the way this API represents + /// transient failure vs. shutdown. /// /// Note that the client channel will almost always have a request /// to \a NextLocked() pending. When it gets the callback, it will diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 4d8958f519..99a33f2277 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -82,6 +82,8 @@ class FakeResolver : public Resolver { grpc_closure* next_completion_ = nullptr; // target result address for next completion grpc_channel_args** target_result_ = nullptr; + // if true, return failure + bool return_failure_ = false; }; FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) { @@ -121,12 +123,16 @@ void FakeResolver::RequestReresolutionLocked() { } void FakeResolver::MaybeFinishNextLocked() { - if (next_completion_ != nullptr && next_results_ != nullptr) { - *target_result_ = grpc_channel_args_union(next_results_, channel_args_); + if (next_completion_ != nullptr && + (next_results_ != nullptr || return_failure_)) { + *target_result_ = + return_failure_ ? nullptr + : grpc_channel_args_union(next_results_, channel_args_); grpc_channel_args_destroy(next_results_); next_results_ = nullptr; GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); next_completion_ = nullptr; + return_failure_ = false; } } @@ -197,6 +203,26 @@ void FakeResolverResponseGenerator::SetReresolutionResponse( GRPC_ERROR_NONE); } +void FakeResolverResponseGenerator::SetFailureLocked(void* arg, + grpc_error* error) { + SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); + FakeResolver* resolver = closure_arg->generator->resolver_; + resolver->return_failure_ = true; + resolver->MaybeFinishNextLocked(); + Delete(closure_arg); +} + +void FakeResolverResponseGenerator::SetFailure() { + GPR_ASSERT(resolver_ != nullptr); + SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); + closure_arg->generator = this; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked, + closure_arg, + grpc_combiner_scheduler(resolver_->combiner())), + GRPC_ERROR_NONE); +} + namespace { static void* response_generator_arg_copy(void* p) { diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index 858f35851d..e5175f9b7b 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -56,6 +56,10 @@ class FakeResolverResponseGenerator // resolver will return the last value set via \a SetResponse(). void SetReresolutionResponse(grpc_channel_args* response); + // Tells the resolver to return a transient failure (signalled by + // returning a null result with no error). + void SetFailure(); + // Returns a channel arg containing \a generator. static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator); @@ -68,6 +72,7 @@ class FakeResolverResponseGenerator static void SetResponseLocked(void* arg, grpc_error* error); static void SetReresolutionResponseLocked(void* arg, grpc_error* error); + static void SetFailureLocked(void* arg, grpc_error* error); FakeResolver* resolver_ = nullptr; // Do not own. }; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index a4d616d778..ab69cecf3a 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -68,7 +68,7 @@ #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ -#define DEFAULT_MAX_PINGS_BETWEEN_DATA 0 /* unlimited */ +#define DEFAULT_MAX_PINGS_BETWEEN_DATA 2 #define DEFAULT_MAX_PING_STRIKES 2 static int g_default_client_keepalive_time_ms = @@ -2630,7 +2630,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); } diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 988380b76c..a10c9ada46 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -373,8 +373,6 @@ error_handler: /* t->parser = grpc_chttp2_data_parser_parse;*/ t->parser = grpc_chttp2_data_parser_parse; t->parser_data = &s->data_parser; - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; return GRPC_ERROR_NONE; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, nullptr)) { @@ -547,8 +545,6 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; } - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 7471d88aa1..6f32397a3a 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -224,7 +224,7 @@ class WriteContext { grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce, &throwaway_stats)); - ResetPingRecvClock(); + ResetPingClock(); } } @@ -269,11 +269,13 @@ class WriteContext { return s; } - void ResetPingRecvClock() { + void ResetPingClock() { if (!t_->is_client) { t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t_->ping_recv_state.ping_strikes = 0; } + t_->ping_state.pings_before_data_required = + t_->ping_policy.max_pings_without_data; } void IncInitialMetadataWrites() { ++initial_metadata_writes_; } @@ -435,7 +437,7 @@ class StreamWriteContext { }; grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0, s_->send_initial_metadata, &hopt, &t_->outbuf); - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); write_context_->IncInitialMetadataWrites(); } @@ -455,7 +457,7 @@ class StreamWriteContext { grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce, &s_->stats.outgoing)); - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); write_context_->IncWindowUpdateWrites(); } @@ -489,7 +491,7 @@ class StreamWriteContext { data_send_context.CompressMoreBytes(); } } - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); if (data_send_context.is_last_frame()) { SentLastFrame(); } @@ -530,7 +532,7 @@ class StreamWriteContext { s_->send_trailing_metadata, &hopt, &t_->outbuf); } write_context_->IncTrailingMetadataWrites(); - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); SentLastFrame(); write_context_->NoteScheduledResults(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 760aaa4b4d..391ca44962 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -45,6 +45,7 @@ #include "src/cpp/thread_manager/thread_manager.h" namespace grpc { +namespace { class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: @@ -53,16 +54,29 @@ class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { void PostSynchronousRequest(ServerContext* context) override {} }; -static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; -static gpr_once g_once_init_callbacks = GPR_ONCE_INIT; +std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; +gpr_once g_once_init_callbacks = GPR_ONCE_INIT; -static void InitGlobalCallbacks() { +void InitGlobalCallbacks() { if (!g_callbacks) { g_callbacks.reset(new DefaultGlobalCallbacks()); } } -class Server::UnimplementedAsyncRequestContext { +class ShutdownTag : public internal::CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { return false; } +}; + +class DummyTag : public internal::CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { + *status = true; + return true; + } +}; + +class UnimplementedAsyncRequestContext { protected: UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} @@ -70,8 +84,14 @@ class Server::UnimplementedAsyncRequestContext { GenericServerAsyncReaderWriter generic_stream_; }; +} // namespace + +/// Use private inheritance rather than composition only to establish order +/// of construction, since the public base class should be constructed after the +/// elements belonging to the private base class are constructed. This is not +/// possible using true composition. class Server::UnimplementedAsyncRequest final - : public UnimplementedAsyncRequestContext, + : private UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) @@ -90,38 +110,27 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, - internal::CallOpServerSendStatus> - UnimplementedAsyncResponseOp; +/// UnimplementedAsyncResponse should not post user-visible completions to the +/// C++ completion queue, but is generated as a CQ event by the core class Server::UnimplementedAsyncResponse final - : public UnimplementedAsyncResponseOp { + : public internal::CallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus> { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + internal::CallOpSet< + internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus>::FinalizeResult(tag, status); delete this; - return r; + return false; } private: UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public internal::CompletionQueueTag { - public: - bool FinalizeResult(void** tag, bool* status) { return false; } -}; - -class DummyTag : public internal::CompletionQueueTag { - public: - bool FinalizeResult(void** tag, bool* status) { - *status = true; - return true; - } -}; - class Server::SyncRequest final : public internal::CompletionQueueTag { public: SyncRequest(internal::RpcServiceMethod* method, void* tag) diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index e7b30cd1e9..abe19a6fc6 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -160,8 +160,17 @@ namespace Grpc.Core var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; lock (myLock) { - // pass "tcs" as "state" for WatchConnectivityStateHandler. - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs); + if (handle.IsClosed) + { + // If channel has been already shutdown and handle was disposed, we would end up with + // an abandoned completion added to the completion registry. Instead, we make sure we fail early. + throw new ObjectDisposedException(nameof(handle), "Channel handle has already been disposed."); + } + else + { + // pass "tcs" as "state" for WatchConnectivityStateHandler. + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs); + } } return tcs.Task; } diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 0892215b6d..2e02111ddd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -30,9 +30,12 @@ cdef class Call: tag, operations, self if retain_self else None) batch_operation_tag.prepare() cpython.Py_INCREF(batch_operation_tag) - return grpc_call_start_batch( + cdef grpc_call_error error + with nogil: + error = grpc_call_start_batch( self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops, <cpython.PyObject *>batch_operation_tag, NULL) + return error def start_client_batch(self, operations, tag): # We don't reference this call in the operations tag because diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index c1a0c56841..e8e87e4f15 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -94,7 +94,6 @@ $CFLAGS << ' -std=c99 ' $CFLAGS << ' -Wall ' $CFLAGS << ' -Wextra ' $CFLAGS << ' -pedantic ' -$CFLAGS << ' -Wno-format ' output = File.join('grpc', 'grpc_c') puts 'Generating Makefile for ' + output diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c index 7fdec2ee8b..4ba6991ef6 100644 --- a/src/ruby/ext/grpc/rb_compression_options.c +++ b/src/ruby/ext/grpc/rb_compression_options.c @@ -186,7 +186,7 @@ void grpc_rb_compression_options_algorithm_name_to_value_internal( error_message_ruby_str = rb_str_new(error_message_str, strlen(error_message_str)); gpr_free(error_message_str); - rb_raise(rb_eNameError, StringValueCStr(error_message_ruby_str)); + rb_raise(rb_eNameError, "%s", StringValueCStr(error_message_ruby_str)); } grpc_slice_unref(name_slice); |