diff options
Diffstat (limited to 'src')
54 files changed, 1651 insertions, 590 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index bbc5160bec..67dd3a1fa7 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -38,12 +38,12 @@ #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" -#include "src/core/ext/filters/client_channel/status_util.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" @@ -303,11 +303,16 @@ static void request_reresolution_locked(void* arg, grpc_error* error) { chand->lb_policy->SetReresolutionClosureLocked(&args->closure); } +// TODO(roth): The logic in this function is very hard to follow. We +// should refactor this so that it's easier to understand, perhaps as +// part of changing the resolver API to more clearly differentiate +// between transient failures and shutdown. static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { channel_data* chand = static_cast<channel_data*>(arg); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, - grpc_error_string(error)); + gpr_log(GPR_DEBUG, + "chand=%p: got resolver result: resolver_result=%p error=%s", chand, + chand->resolver_result, grpc_error_string(error)); } // Extract the following fields from the resolver result, if non-nullptr. bool lb_policy_updated = false; @@ -423,8 +428,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } } } - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; } if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, @@ -497,6 +500,8 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { "Channel disconnected", &error, 1)); GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); + grpc_channel_args_destroy(chand->resolver_result); + chand->resolver_result = nullptr; } else { // Not shutting down. grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_error* state_error = @@ -515,11 +520,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { chand->exit_idle_when_lb_policy_arrives = false; } watch_lb_policy_locked(chand, chand->lb_policy.get(), state); + } else if (chand->resolver_result == nullptr) { + // Transient failure. + GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } if (!lb_policy_updated) { set_channel_connectivity_state_locked( chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); } + grpc_channel_args_destroy(chand->resolver_result); + chand->resolver_result = nullptr; chand->resolver->NextLocked(&chand->resolver_result, &chand->on_resolver_result_changed); GRPC_ERROR_UNREF(state_error); @@ -2753,7 +2763,45 @@ static void pick_after_resolver_result_done_locked(void* arg, chand, calld); } async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (chand->lb_policy != nullptr) { + } else if (chand->resolver == nullptr) { + // Shutting down. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + async_pick_done_locked( + elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else if (chand->lb_policy == nullptr) { + // Transient resolver failure. + // If call has wait_for_ready=true, try again; otherwise, fail. + uint32_t send_initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=true; trying again", + chand, calld); + } + pick_after_resolver_result_start_locked(elem); + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=false; failing", + chand, calld); + } + async_pick_done_locked( + elem, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } + } else { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); @@ -2767,30 +2815,6 @@ static void pick_after_resolver_result_done_locked(void* arg, async_pick_done_locked(elem, GRPC_ERROR_NONE); } } - // TODO(roth): It should be impossible for chand->lb_policy to be nullptr - // here, so the rest of this code should never actually be executed. - // However, we have reports of a crash on iOS that triggers this case, - // so we are temporarily adding this to restore branches that were - // removed in https://github.com/grpc/grpc/pull/12297. Need to figure - // out what is actually causing this to occur and then figure out the - // right way to deal with it. - else if (chand->resolver != nullptr) { - // No LB policy, so try again. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: resolver returned but no LB policy, " - "trying again", - chand, calld); - } - pick_after_resolver_result_start_locked(elem); - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, - calld); - } - async_pick_done_locked( - elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } } static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { diff --git a/src/core/ext/filters/client_channel/method_params.cc b/src/core/ext/filters/client_channel/method_params.cc index 374b87e170..1f116bb67d 100644 --- a/src/core/ext/filters/client_channel/method_params.cc +++ b/src/core/ext/filters/client_channel/method_params.cc @@ -26,7 +26,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/filters/client_channel/method_params.h" -#include "src/core/ext/filters/client_channel/status_util.h" +#include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/memory.h" diff --git a/src/core/ext/filters/client_channel/method_params.h b/src/core/ext/filters/client_channel/method_params.h index 48ece29867..099924edf3 100644 --- a/src/core/ext/filters/client_channel/method_params.h +++ b/src/core/ext/filters/client_channel/method_params.h @@ -21,7 +21,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/status_util.h" +#include "src/core/lib/channel/status_util.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/exec_ctx.h" // for grpc_millis diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index 1685a6c803..cdb5a20ea3 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -53,8 +53,12 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> { /// Requests a callback when a new result becomes available. /// When the new result is available, sets \a *result to the new result /// and schedules \a on_complete for execution. + /// Upon transient failure, sets \a *result to nullptr and schedules + /// \a on_complete with no error. /// If resolution is fatally broken, sets \a *result to nullptr and /// schedules \a on_complete with an error. + /// TODO(roth): When we have time, improve the way this API represents + /// transient failure vs. shutdown. /// /// Note that the client channel will almost always have a request /// to \a NextLocked() pending. When it gets the callback, it will diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index c63de3c509..aca7307a2f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -28,6 +28,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> +#include <address_sorting/address_sorting.h> + #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" @@ -458,6 +460,7 @@ void grpc_resolver_dns_ares_init() { /* TODO(zyc): Turn on c-ares based resolver by default after the address sorter and the CNAME support are added. */ if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) { + address_sorting_init(); grpc_error* error = grpc_ares_init(); if (error != GRPC_ERROR_NONE) { GRPC_LOG_IF_ERROR("ares_library_init() failed", error); @@ -475,6 +478,7 @@ void grpc_resolver_dns_ares_init() { void grpc_resolver_dns_ares_shutdown() { char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER"); if (resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0) { + address_sorting_shutdown(); grpc_ares_cleanup(); } gpr_free(resolver_env); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 71b06eb87e..fb2435749d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -33,6 +33,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include <address_sorting/address_sorting.h> #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/gpr/host_port.h" @@ -46,6 +47,9 @@ static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; +grpc_core::TraceFlag grpc_trace_cares_address_sorting(false, + "cares_address_sorting"); + struct grpc_ares_request { /** indicates the DNS server to use, if specified */ struct ares_addr_port_node dns_server_addr; @@ -96,11 +100,60 @@ static void grpc_ares_request_ref(grpc_ares_request* r) { gpr_ref(&r->pending_queries); } +static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, + const char* input_output_str) { + for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + char* addr_str; + if (grpc_sockaddr_to_string(&addr_str, &lb_addrs->addresses[i].address, + true)) { + gpr_log(GPR_DEBUG, "c-ares address sorting: %s[%" PRIuPTR "]=%s", + input_output_str, i, addr_str); + gpr_free(addr_str); + } else { + gpr_log(GPR_DEBUG, + "c-ares address sorting: %s[%" PRIuPTR "]=<unprintable>", + input_output_str, i); + } + } +} + +void grpc_cares_wrapper_address_sorting_sort(grpc_lb_addresses* lb_addrs) { + if (grpc_trace_cares_address_sorting.enabled()) { + log_address_sorting_list(lb_addrs, "input"); + } + address_sorting_sortable* sortables = (address_sorting_sortable*)gpr_zalloc( + sizeof(address_sorting_sortable) * lb_addrs->num_addresses); + for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + sortables[i].user_data = &lb_addrs->addresses[i]; + memcpy(&sortables[i].dest_addr.addr, &lb_addrs->addresses[i].address.addr, + lb_addrs->addresses[i].address.len); + sortables[i].dest_addr.len = lb_addrs->addresses[i].address.len; + } + address_sorting_rfc_6724_sort(sortables, lb_addrs->num_addresses); + grpc_lb_address* sorted_lb_addrs = (grpc_lb_address*)gpr_zalloc( + sizeof(grpc_lb_address) * lb_addrs->num_addresses); + for (size_t i = 0; i < lb_addrs->num_addresses; i++) { + sorted_lb_addrs[i] = *(grpc_lb_address*)sortables[i].user_data; + } + gpr_free(sortables); + gpr_free(lb_addrs->addresses); + lb_addrs->addresses = sorted_lb_addrs; + if (grpc_trace_cares_address_sorting.enabled()) { + log_address_sorting_list(lb_addrs, "output"); + } +} + +/* Allow tests to access grpc_ares_wrapper_address_sorting_sort */ +void grpc_cares_wrapper_test_only_address_sorting_sort( + grpc_lb_addresses* lb_addrs) { + grpc_cares_wrapper_address_sorting_sort(lb_addrs); +} + static void grpc_ares_request_unref(grpc_ares_request* r) { /* If there are no pending queries, invoke on_done callback and destroy the request */ if (gpr_unref(&r->pending_queries)) { - /* TODO(zyc): Sort results with RFC6724 before invoking on_done. */ + grpc_cares_wrapper_address_sorting_sort(*(r->lb_addrs_out)); GRPC_CLOSURE_SCHED(r->on_done, r->error); gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy(r->ev_driver); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 3e8ea01e12..2d84a038d6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -26,6 +26,8 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" +extern grpc_core::TraceFlag grpc_trace_cares_address_sorting; + typedef struct grpc_ares_request grpc_ares_request; /* Asynchronously resolve \a name. Use \a default_port if a port isn't @@ -64,5 +66,9 @@ grpc_error* grpc_ares_init(void); it has been called the same number of times as grpc_ares_init(). */ void grpc_ares_cleanup(void); +/* Exposed only for testing */ +void grpc_cares_wrapper_test_only_address_sorting_sort( + grpc_lb_addresses* lb_addrs); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_WRAPPER_H \ */ diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 4d8958f519..99a33f2277 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -82,6 +82,8 @@ class FakeResolver : public Resolver { grpc_closure* next_completion_ = nullptr; // target result address for next completion grpc_channel_args** target_result_ = nullptr; + // if true, return failure + bool return_failure_ = false; }; FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) { @@ -121,12 +123,16 @@ void FakeResolver::RequestReresolutionLocked() { } void FakeResolver::MaybeFinishNextLocked() { - if (next_completion_ != nullptr && next_results_ != nullptr) { - *target_result_ = grpc_channel_args_union(next_results_, channel_args_); + if (next_completion_ != nullptr && + (next_results_ != nullptr || return_failure_)) { + *target_result_ = + return_failure_ ? nullptr + : grpc_channel_args_union(next_results_, channel_args_); grpc_channel_args_destroy(next_results_); next_results_ = nullptr; GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE); next_completion_ = nullptr; + return_failure_ = false; } } @@ -197,6 +203,26 @@ void FakeResolverResponseGenerator::SetReresolutionResponse( GRPC_ERROR_NONE); } +void FakeResolverResponseGenerator::SetFailureLocked(void* arg, + grpc_error* error) { + SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg); + FakeResolver* resolver = closure_arg->generator->resolver_; + resolver->return_failure_ = true; + resolver->MaybeFinishNextLocked(); + Delete(closure_arg); +} + +void FakeResolverResponseGenerator::SetFailure() { + GPR_ASSERT(resolver_ != nullptr); + SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>(); + closure_arg->generator = this; + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked, + closure_arg, + grpc_combiner_scheduler(resolver_->combiner())), + GRPC_ERROR_NONE); +} + namespace { static void* response_generator_arg_copy(void* p) { diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index 858f35851d..e5175f9b7b 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -56,6 +56,10 @@ class FakeResolverResponseGenerator // resolver will return the last value set via \a SetResponse(). void SetReresolutionResponse(grpc_channel_args* response); + // Tells the resolver to return a transient failure (signalled by + // returning a null result with no error). + void SetFailure(); + // Returns a channel arg containing \a generator. static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator); @@ -68,6 +72,7 @@ class FakeResolverResponseGenerator static void SetResponseLocked(void* arg, grpc_error* error); static void SetReresolutionResponseLocked(void* arg, grpc_error* error); + static void SetFailureLocked(void* arg, grpc_error* error); FakeResolver* resolver_ = nullptr; // Do not own. }; diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index cae7cc35e3..d7815fb7e1 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -40,6 +40,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index a4d616d778..dc4e002405 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -68,7 +68,7 @@ #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ -#define DEFAULT_MAX_PINGS_BETWEEN_DATA 0 /* unlimited */ +#define DEFAULT_MAX_PINGS_BETWEEN_DATA 2 #define DEFAULT_MAX_PING_STRIKES 2 static int g_default_client_keepalive_time_ms = @@ -669,6 +669,7 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); + s->unprocessed_incoming_frames_buffer_cached_length = 0; grpc_slice_buffer_init(&s->frame_storage); grpc_slice_buffer_init(&s->compressed_data_buffer); grpc_slice_buffer_init(&s->decompressed_data_buffer); @@ -1577,20 +1578,27 @@ static void perform_stream_op_locked(void* stream_op, if (op->recv_message) { GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(); - size_t already_received; + size_t before = 0; GPR_ASSERT(s->recv_message_ready == nullptr); GPR_ASSERT(!s->pending_byte_stream); s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message = op_payload->recv_message.recv_message; if (s->id != 0) { if (!s->read_closed) { - already_received = s->frame_storage.length; + before = s->frame_storage.length + + s->unprocessed_incoming_frames_buffer.length; + } + } + grpc_chttp2_maybe_complete_recv_message(t, s); + if (s->id != 0) { + if (!s->read_closed && s->frame_storage.length == 0) { + size_t after = s->frame_storage.length + + s->unprocessed_incoming_frames_buffer_cached_length; s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES, - already_received); + before - after); grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s); } } - grpc_chttp2_maybe_complete_recv_message(t, s); } if (op->recv_trailing_metadata) { @@ -1867,6 +1875,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, } } } + // save the length of the buffer before handing control back to application + // threads. Needed to support correct flow control bookkeeping + s->unprocessed_incoming_frames_buffer_cached_length = + s->unprocessed_incoming_frames_buffer.length; if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) { null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE); } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { @@ -2630,7 +2642,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6d11e5aa31..ca6e715978 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -550,6 +550,11 @@ struct grpc_chttp2_stream { grpc_slice_buffer unprocessed_incoming_frames_buffer; grpc_closure* on_next; /* protected by t combiner */ bool pending_byte_stream; /* protected by t combiner */ + // cached length of buffer to be used by the transport thread in cases where + // stream->pending_byte_stream == true. The value is saved before + // application threads are allowed to modify + // unprocessed_incoming_frames_buffer + size_t unprocessed_incoming_frames_buffer_cached_length; grpc_closure reset_byte_stream; grpc_error* byte_stream_error; /* protected by t combiner */ bool received_last_frame; /* protected by t combiner */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 988380b76c..a10c9ada46 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -373,8 +373,6 @@ error_handler: /* t->parser = grpc_chttp2_data_parser_parse;*/ t->parser = grpc_chttp2_data_parser_parse; t->parser_data = &s->data_parser; - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; return GRPC_ERROR_NONE; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, nullptr)) { @@ -547,8 +545,6 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; } - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 7471d88aa1..6f32397a3a 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -224,7 +224,7 @@ class WriteContext { grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce, &throwaway_stats)); - ResetPingRecvClock(); + ResetPingClock(); } } @@ -269,11 +269,13 @@ class WriteContext { return s; } - void ResetPingRecvClock() { + void ResetPingClock() { if (!t_->is_client) { t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t_->ping_recv_state.ping_strikes = 0; } + t_->ping_state.pings_before_data_required = + t_->ping_policy.max_pings_without_data; } void IncInitialMetadataWrites() { ++initial_metadata_writes_; } @@ -435,7 +437,7 @@ class StreamWriteContext { }; grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0, s_->send_initial_metadata, &hopt, &t_->outbuf); - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); write_context_->IncInitialMetadataWrites(); } @@ -455,7 +457,7 @@ class StreamWriteContext { grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce, &s_->stats.outgoing)); - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); write_context_->IncWindowUpdateWrites(); } @@ -489,7 +491,7 @@ class StreamWriteContext { data_send_context.CompressMoreBytes(); } } - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); if (data_send_context.is_last_frame()) { SentLastFrame(); } @@ -530,7 +532,7 @@ class StreamWriteContext { s_->send_trailing_metadata, &hopt, &t_->outbuf); } write_context_->IncTrailingMetadataWrites(); - write_context_->ResetPingRecvClock(); + write_context_->ResetPingClock(); SentLastFrame(); write_context_->NoteScheduledResults(); diff --git a/src/core/lib/channel/channel_trace.cc b/src/core/lib/channel/channel_trace.cc new file mode 100644 index 0000000000..654300cd32 --- /dev/null +++ b/src/core/lib/channel/channel_trace.cc @@ -0,0 +1,239 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include "src/core/lib/channel/channel_trace.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "src/core/lib/channel/channel_trace_registry.h" +#include "src/core/lib/channel/status_util.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" + +namespace grpc_core { + +ChannelTrace::TraceEvent::TraceEvent( + Severity severity, grpc_slice data, + RefCountedPtr<ChannelTrace> referenced_tracer, ReferencedType type) + : severity_(severity), + data_(data), + timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), + GPR_CLOCK_REALTIME)), + next_(nullptr), + referenced_tracer_(std::move(referenced_tracer)), + referenced_type_(type) {} + +ChannelTrace::TraceEvent::TraceEvent(Severity severity, grpc_slice data) + : severity_(severity), + data_(data), + timestamp_(grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), + GPR_CLOCK_REALTIME)), + next_(nullptr) {} + +ChannelTrace::TraceEvent::~TraceEvent() { grpc_slice_unref_internal(data_); } + +ChannelTrace::ChannelTrace(size_t max_events) + : channel_uuid_(-1), + num_events_logged_(0), + list_size_(0), + max_list_size_(max_events), + head_trace_(nullptr), + tail_trace_(nullptr) { + if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 + gpr_mu_init(&tracer_mu_); + channel_uuid_ = grpc_channel_trace_registry_register_channel_trace(this); + time_created_ = grpc_millis_to_timespec(grpc_core::ExecCtx::Get()->Now(), + GPR_CLOCK_REALTIME); +} + +ChannelTrace::~ChannelTrace() { + if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 + TraceEvent* it = head_trace_; + while (it != nullptr) { + TraceEvent* to_free = it; + it = it->next(); + Delete<TraceEvent>(to_free); + } + grpc_channel_trace_registry_unregister_channel_trace(channel_uuid_); + gpr_mu_destroy(&tracer_mu_); +} + +intptr_t ChannelTrace::GetUuid() const { return channel_uuid_; } + +void ChannelTrace::AddTraceEventHelper(TraceEvent* new_trace_event) { + ++num_events_logged_; + // first event case + if (head_trace_ == nullptr) { + head_trace_ = tail_trace_ = new_trace_event; + } + // regular event add case + else { + tail_trace_->set_next(new_trace_event); + tail_trace_ = tail_trace_->next(); + } + ++list_size_; + // maybe garbage collect the end + if (list_size_ > max_list_size_) { + TraceEvent* to_free = head_trace_; + head_trace_ = head_trace_->next(); + Delete<TraceEvent>(to_free); + --list_size_; + } +} + +void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) { + if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 + AddTraceEventHelper(New<TraceEvent>(severity, data)); +} + +void ChannelTrace::AddTraceEventReferencingChannel( + Severity severity, grpc_slice data, + RefCountedPtr<ChannelTrace> referenced_tracer) { + if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 + // create and fill up the new event + AddTraceEventHelper( + New<TraceEvent>(severity, data, std::move(referenced_tracer), Channel)); +} + +void ChannelTrace::AddTraceEventReferencingSubchannel( + Severity severity, grpc_slice data, + RefCountedPtr<ChannelTrace> referenced_tracer) { + if (max_list_size_ == 0) return; // tracing is disabled if max_events == 0 + // create and fill up the new event + AddTraceEventHelper(New<TraceEvent>( + severity, data, std::move(referenced_tracer), Subchannel)); +} + +namespace { + +// returns an allocated string that represents tm according to RFC-3339, and, +// more specifically, follows: +// https://developers.google.com/protocol-buffers/docs/proto3#json +// +// "Uses RFC 3339, where generated output will always be Z-normalized and uses +// 0, 3, 6 or 9 fractional digits." +char* fmt_time(gpr_timespec tm) { + char time_buffer[35]; + char ns_buffer[11]; // '.' + 9 digits of precision + struct tm* tm_info = localtime((const time_t*)&tm.tv_sec); + strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%S", tm_info); + snprintf(ns_buffer, 11, ".%09d", tm.tv_nsec); + // This loop trims off trailing zeros by inserting a null character that the + // right point. We iterate in chunks of three because we want 0, 3, 6, or 9 + // fractional digits. + for (int i = 7; i >= 1; i -= 3) { + if (ns_buffer[i] == '0' && ns_buffer[i + 1] == '0' && + ns_buffer[i + 2] == '0') { + ns_buffer[i] = '\0'; + // Edge case in which all fractional digits were 0. + if (i == 1) { + ns_buffer[0] = '\0'; + } + } else { + break; + } + } + char* full_time_str; + gpr_asprintf(&full_time_str, "%s%sZ", time_buffer, ns_buffer); + return full_time_str; +} + +const char* severity_string(ChannelTrace::Severity severity) { + switch (severity) { + case ChannelTrace::Severity::Info: + return "CT_INFO"; + case ChannelTrace::Severity::Warning: + return "CT_WARNING"; + case ChannelTrace::Severity::Error: + return "CT_ERROR"; + default: + GPR_UNREACHABLE_CODE(return "CT_UNKNOWN"); + } +} + +} // anonymous namespace + +void ChannelTrace::TraceEvent::RenderTraceEvent(grpc_json* json) const { + grpc_json* json_iterator = nullptr; + json_iterator = grpc_json_create_child(json_iterator, json, "description", + grpc_slice_to_c_string(data_), + GRPC_JSON_STRING, true); + json_iterator = grpc_json_create_child(json_iterator, json, "severity", + severity_string(severity_), + GRPC_JSON_STRING, false); + json_iterator = + grpc_json_create_child(json_iterator, json, "timestamp", + fmt_time(timestamp_), GRPC_JSON_STRING, true); + if (referenced_tracer_ != nullptr) { + char* uuid_str; + gpr_asprintf(&uuid_str, "%" PRIdPTR, referenced_tracer_->channel_uuid_); + grpc_json* child_ref = grpc_json_create_child( + json_iterator, json, + (referenced_type_ == Channel) ? "channelRef" : "subchannelRef", nullptr, + GRPC_JSON_OBJECT, false); + json_iterator = grpc_json_create_child( + nullptr, child_ref, + (referenced_type_ == Channel) ? "channelId" : "subchannelId", uuid_str, + GRPC_JSON_STRING, true); + json_iterator = child_ref; + } +} + +char* ChannelTrace::RenderTrace() const { + if (!max_list_size_) + return nullptr; // tracing is disabled if max_events == 0 + grpc_json* json = grpc_json_create(GRPC_JSON_OBJECT); + char* num_events_logged_str; + gpr_asprintf(&num_events_logged_str, "%" PRId64, num_events_logged_); + grpc_json* json_iterator = nullptr; + json_iterator = + grpc_json_create_child(json_iterator, json, "numEventsLogged", + num_events_logged_str, GRPC_JSON_STRING, true); + json_iterator = + grpc_json_create_child(json_iterator, json, "creationTime", + fmt_time(time_created_), GRPC_JSON_STRING, true); + grpc_json* events = grpc_json_create_child(json_iterator, json, "events", + nullptr, GRPC_JSON_ARRAY, false); + json_iterator = nullptr; + TraceEvent* it = head_trace_; + while (it != nullptr) { + json_iterator = grpc_json_create_child(json_iterator, events, nullptr, + nullptr, GRPC_JSON_OBJECT, false); + it->RenderTraceEvent(json_iterator); + it = it->next(); + } + char* json_str = grpc_json_dump_to_string(json, 0); + grpc_json_destroy(json); + return json_str; +} + +} // namespace grpc_core diff --git a/src/core/lib/channel/channel_trace.h b/src/core/lib/channel/channel_trace.h new file mode 100644 index 0000000000..1df1e585f2 --- /dev/null +++ b/src/core/lib/channel/channel_trace.h @@ -0,0 +1,133 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_H +#define GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_H + +#include <grpc/impl/codegen/port_platform.h> + +#include <grpc/grpc.h> +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/json/json.h" + +namespace grpc_core { + +// Object used to hold live data for a channel. This data is exposed via the +// channelz service: +// https://github.com/grpc/proposal/blob/master/A14-channelz.md +class ChannelTrace : public RefCounted<ChannelTrace> { + public: + ChannelTrace(size_t max_events); + ~ChannelTrace(); + + // returns the tracer's uuid + intptr_t GetUuid() const; + + enum Severity { + Unset = 0, // never to be used + Info, // we start at 1 to avoid using proto default values + Warning, + Error + }; + + // Adds a new trace event to the tracing object + // + // TODO(ncteisen): as this call is used more and more throughout the gRPC + // stack, determine if it makes more sense to accept a char* instead of a + // slice. + void AddTraceEvent(Severity severity, grpc_slice data); + + // Adds a new trace event to the tracing object. This trace event refers to a + // an event on a child of the channel. For example, if this channel has + // created a new subchannel, then it would record that with a TraceEvent + // referencing the new subchannel. + // + // TODO(ncteisen): Once channelz is implemented, the events should reference + // the overall channelz object, not just the ChannelTrace object. + // TODO(ncteisen): as this call is used more and more throughout the gRPC + // stack, determine if it makes more sense to accept a char* instead of a + // slice. + void AddTraceEventReferencingChannel( + Severity severity, grpc_slice data, + RefCountedPtr<ChannelTrace> referenced_tracer); + void AddTraceEventReferencingSubchannel( + Severity severity, grpc_slice data, + RefCountedPtr<ChannelTrace> referenced_tracer); + + // Returns the tracing data rendered as a grpc json string. + // The string is owned by the caller and must be freed. + char* RenderTrace() const; + + private: + // Types of objects that can be references by trace events. + enum ReferencedType { Channel, Subchannel }; + // Private class to encapsulate all the data and bookkeeping needed for a + // a trace event. + class TraceEvent { + public: + // Constructor for a TraceEvent that references a different channel. + // TODO(ncteisen): once channelz is implemented, this should reference the + // overall channelz object, not just the ChannelTrace object + TraceEvent(Severity severity, grpc_slice data, + RefCountedPtr<ChannelTrace> referenced_tracer, + ReferencedType type); + + // Constructor for a TraceEvent that does not reverence a different + // channel. + TraceEvent(Severity severity, grpc_slice data); + + ~TraceEvent(); + + // Renders the data inside of this TraceEvent into a json object. This is + // used by the ChannelTrace, when it is rendering itself. + void RenderTraceEvent(grpc_json* json) const; + + // set and get for the next_ pointer. + TraceEvent* next() const { return next_; } + void set_next(TraceEvent* next) { next_ = next; } + + private: + Severity severity_; + grpc_slice data_; + gpr_timespec timestamp_; + TraceEvent* next_; + // the tracer object for the (sub)channel that this trace event refers to. + RefCountedPtr<ChannelTrace> referenced_tracer_; + // the type that the referenced tracer points to. Unused if this trace + // does not point to any channel or subchannel + ReferencedType referenced_type_; + }; // TraceEvent + + // Internal helper to add and link in a trace event + void AddTraceEventHelper(TraceEvent* new_trace_event); + + gpr_mu tracer_mu_; + intptr_t channel_uuid_; + uint64_t num_events_logged_; + size_t list_size_; + size_t max_list_size_; + TraceEvent* head_trace_; + TraceEvent* tail_trace_; + gpr_timespec time_created_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_H */ diff --git a/src/core/lib/channel/channel_trace_registry.cc b/src/core/lib/channel/channel_trace_registry.cc new file mode 100644 index 0000000000..6c82431467 --- /dev/null +++ b/src/core/lib/channel/channel_trace_registry.cc @@ -0,0 +1,80 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include "src/core/lib/avl/avl.h" +#include "src/core/lib/channel/channel_trace.h" +#include "src/core/lib/channel/channel_trace_registry.h" +#include "src/core/lib/gpr/useful.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +// file global lock and avl. +static gpr_mu g_mu; +static grpc_avl g_avl; +static gpr_atm g_uuid = 0; + +// avl vtable for uuid (intptr_t) -> ChannelTrace +// this table is only looking, it does not own anything. +static void destroy_intptr(void* not_used, void* user_data) {} +static void* copy_intptr(void* key, void* user_data) { return key; } +static long compare_intptr(void* key1, void* key2, void* user_data) { + return GPR_ICMP(key1, key2); +} + +static void destroy_channel_trace(void* trace, void* user_data) {} +static void* copy_channel_trace(void* trace, void* user_data) { return trace; } +static const grpc_avl_vtable avl_vtable = { + destroy_intptr, copy_intptr, compare_intptr, destroy_channel_trace, + copy_channel_trace}; + +void grpc_channel_trace_registry_init() { + gpr_mu_init(&g_mu); + g_avl = grpc_avl_create(&avl_vtable); +} + +void grpc_channel_trace_registry_shutdown() { + grpc_avl_unref(g_avl, nullptr); + gpr_mu_destroy(&g_mu); +} + +intptr_t grpc_channel_trace_registry_register_channel_trace( + grpc_core::ChannelTrace* channel_trace) { + intptr_t prior = gpr_atm_no_barrier_fetch_add(&g_uuid, 1); + gpr_mu_lock(&g_mu); + g_avl = grpc_avl_add(g_avl, (void*)prior, channel_trace, nullptr); + gpr_mu_unlock(&g_mu); + return prior; +} + +void grpc_channel_trace_registry_unregister_channel_trace(intptr_t uuid) { + gpr_mu_lock(&g_mu); + g_avl = grpc_avl_remove(g_avl, (void*)uuid, nullptr); + gpr_mu_unlock(&g_mu); +} + +grpc_core::ChannelTrace* grpc_channel_trace_registry_get_channel_trace( + intptr_t uuid) { + gpr_mu_lock(&g_mu); + grpc_core::ChannelTrace* ret = static_cast<grpc_core::ChannelTrace*>( + grpc_avl_get(g_avl, (void*)uuid, nullptr)); + gpr_mu_unlock(&g_mu); + return ret; +} diff --git a/src/core/lib/channel/channel_trace_registry.h b/src/core/lib/channel/channel_trace_registry.h new file mode 100644 index 0000000000..391ecba7de --- /dev/null +++ b/src/core/lib/channel/channel_trace_registry.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_REGISTRY_H +#define GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_REGISTRY_H + +#include <grpc/impl/codegen/port_platform.h> + +#include "src/core/lib/channel/channel_trace.h" + +#include <stdint.h> + +// TODO(ncteisen): convert this file to C++ + +void grpc_channel_trace_registry_init(); +void grpc_channel_trace_registry_shutdown(); + +// globally registers a ChannelTrace. Returns its unique uuid +intptr_t grpc_channel_trace_registry_register_channel_trace( + grpc_core::ChannelTrace* channel_trace); +// globally unregisters the ChannelTrace that is associated to uuid. +void grpc_channel_trace_registry_unregister_channel_trace(intptr_t uuid); +// if object with uuid has previously been registered, returns the ChannelTrace +// associated with that uuid. Else returns nullptr. +grpc_core::ChannelTrace* grpc_channel_trace_registry_get_channel_trace( + intptr_t uuid); + +#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_TRACE_REGISTRY_H */ diff --git a/src/core/ext/filters/client_channel/status_util.cc b/src/core/lib/channel/status_util.cc index 11f732ab44..563db40846 100644 --- a/src/core/ext/filters/client_channel/status_util.cc +++ b/src/core/lib/channel/status_util.cc @@ -18,7 +18,7 @@ #include <grpc/support/port_platform.h> -#include "src/core/ext/filters/client_channel/status_util.h" +#include "src/core/lib/channel/status_util.h" #include "src/core/lib/gpr/useful.h" diff --git a/src/core/ext/filters/client_channel/status_util.h b/src/core/lib/channel/status_util.h index e018709730..5409de6b3c 100644 --- a/src/core/ext/filters/client_channel/status_util.h +++ b/src/core/lib/channel/status_util.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H -#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H +#ifndef GRPC_CORE_LIB_CHANNEL_STATUS_UTIL_H +#define GRPC_CORE_LIB_CHANNEL_STATUS_UTIL_H #include <grpc/support/port_platform.h> @@ -55,4 +55,4 @@ class StatusCodeSet { } // namespace internal } // namespace grpc_core -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_STATUS_UTIL_H */ +#endif /* GRPC_CORE_LIB_CHANNEL_STATUS_UTIL_H */ diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index 886357fc00..0b53d63e77 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -102,8 +102,8 @@ static grpc_security_connector_vtable httpcli_ssl_vtable = { httpcli_ssl_destroy, httpcli_ssl_check_peer, httpcli_ssl_cmp}; static grpc_security_status httpcli_ssl_channel_security_connector_create( - const char* pem_root_certs, const char* secure_peer_name, - grpc_channel_security_connector** sc) { + const char* pem_root_certs, const tsi_ssl_root_certs_store* root_store, + const char* secure_peer_name, grpc_channel_security_connector** sc) { tsi_result result = TSI_OK; grpc_httpcli_ssl_channel_security_connector* c; @@ -124,6 +124,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( tsi_ssl_client_handshaker_options options; memset(&options, 0, sizeof(options)); options.pem_root_certs = pem_root_certs; + options.root_store = root_store; result = tsi_create_ssl_client_handshaker_factory_with_options( &options, &c->handshaker_factory); if (result != TSI_OK) { @@ -172,8 +173,11 @@ static void ssl_handshake(void* arg, grpc_endpoint* tcp, const char* host, grpc_millis deadline, void (*on_done)(void* arg, grpc_endpoint* endpoint)) { on_done_closure* c = static_cast<on_done_closure*>(gpr_malloc(sizeof(*c))); - const char* pem_root_certs = grpc_get_default_ssl_roots(); - if (pem_root_certs == nullptr) { + const char* pem_root_certs = + grpc_core::DefaultSslRootStore::GetPemRootCerts(); + const tsi_ssl_root_certs_store* root_store = + grpc_core::DefaultSslRootStore::GetRootStore(); + if (root_store == nullptr) { gpr_log(GPR_ERROR, "Could not get default pem root certs."); on_done(arg, nullptr); gpr_free(c); @@ -183,7 +187,7 @@ static void ssl_handshake(void* arg, grpc_endpoint* tcp, const char* host, c->arg = arg; grpc_channel_security_connector* sc = nullptr; GPR_ASSERT(httpcli_ssl_channel_security_connector_create( - pem_root_certs, host, &sc) == GRPC_SECURITY_OK); + pem_root_certs, root_store, host, &sc) == GRPC_SECURITY_OK); grpc_arg channel_arg = grpc_security_connector_to_arg(&sc->base); grpc_channel_args args = {1, &channel_arg}; c->handshake_mgr = grpc_handshake_manager_create(); diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index ba943d302a..9f19b833da 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -293,7 +293,6 @@ void grpc_tcp_client_create_from_prepared_fd( int err; async_connect* ac; do { - GPR_ASSERT(addr->len < ~(socklen_t)0); err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr), addr->len); } while (err < 0 && errno == EINTR); diff --git a/src/core/lib/json/json.cc b/src/core/lib/json/json.cc index 2141db4c5b..816241bbf0 100644 --- a/src/core/lib/json/json.cc +++ b/src/core/lib/json/json.cc @@ -21,6 +21,7 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/lib/json/json.h" @@ -46,5 +47,40 @@ void grpc_json_destroy(grpc_json* json) { json->parent->child = json->next; } + if (json->owns_value) { + gpr_free((void*)json->value); + } + gpr_free(json); } + +grpc_json* grpc_json_link_child(grpc_json* parent, grpc_json* child, + grpc_json* sibling) { + // first child case. + if (parent->child == nullptr) { + GPR_ASSERT(sibling == nullptr); + parent->child = child; + return child; + } + if (sibling == nullptr) { + sibling = parent->child; + } + // always find the right most sibling. + while (sibling->next != nullptr) { + sibling = sibling->next; + } + sibling->next = child; + return child; +} + +grpc_json* grpc_json_create_child(grpc_json* sibling, grpc_json* parent, + const char* key, const char* value, + grpc_json_type type, bool owns_value) { + grpc_json* child = grpc_json_create(type); + grpc_json_link_child(parent, child, sibling); + child->owns_value = owns_value; + child->parent = parent; + child->value = value; + child->key = key; + return child; +} diff --git a/src/core/lib/json/json.h b/src/core/lib/json/json.h index 3a62ef9cfb..f93b43048b 100644 --- a/src/core/lib/json/json.h +++ b/src/core/lib/json/json.h @@ -21,6 +21,7 @@ #include <grpc/support/port_platform.h> +#include <stdbool.h> #include <stdlib.h> #include "src/core/lib/json/json_common.h" @@ -37,6 +38,9 @@ typedef struct grpc_json { grpc_json_type type; const char* key; const char* value; + + /* if set, destructor will free value */ + bool owns_value; } grpc_json; /* The next two functions are going to parse the input string, and @@ -67,9 +71,24 @@ char* grpc_json_dump_to_string(grpc_json* json, int indent); /* Use these to create or delete a grpc_json object. * Deletion is recursive. We will not attempt to free any of the strings - * in any of the objects of that tree. + * in any of the objects of that tree, unless the boolean, owns_value, + * is true. */ grpc_json* grpc_json_create(grpc_json_type type); void grpc_json_destroy(grpc_json* json); +/* Links the child json object into the parent's json tree. If the parent + * already has children, then passing in the most recently added child as the + * sibling parameter is an optimization. For if sibling is NULL, this function + * will manually traverse the tree in order to find the right most sibling. + */ +grpc_json* grpc_json_link_child(grpc_json* parent, grpc_json* child, + grpc_json* sibling); + +/* Creates a child json object into the parent's json tree then links it in + * as described above. */ +grpc_json* grpc_json_create_child(grpc_json* sibling, grpc_json* parent, + const char* key, const char* value, + grpc_json_type type, bool owns_value); + #endif /* GRPC_CORE_LIB_JSON_JSON_H */ diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc index 43384fd0ca..652a498b6e 100644 --- a/src/core/lib/profiling/basic_timers.cc +++ b/src/core/lib/profiling/basic_timers.cc @@ -26,11 +26,11 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include <inttypes.h> #include <stdio.h> #include <string.h> #include "src/core/lib/gpr/env.h" -#include "src/core/lib/gprpp/thd.h" typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type; @@ -68,7 +68,7 @@ static pthread_cond_t g_cv; static gpr_timer_log_list g_in_progress_logs; static gpr_timer_log_list g_done_logs; static int g_shutdown; -static grpc_core::Thread* g_writing_thread; +static pthread_t g_writing_thread; static __thread int g_thread_id; static int g_next_thread_id; static int g_writing_enabled = 1; @@ -149,7 +149,7 @@ static void write_log(gpr_timer_log* log) { } } -static void writing_thread(void* unused) { +static void* writing_thread(void* unused) { gpr_timer_log* log; pthread_mutex_lock(&g_mu); for (;;) { @@ -164,7 +164,7 @@ static void writing_thread(void* unused) { } if (g_shutdown) { pthread_mutex_unlock(&g_mu); - return; + return NULL; } } } @@ -182,8 +182,7 @@ static void finish_writing(void) { g_shutdown = 1; pthread_cond_signal(&g_cv); pthread_mutex_unlock(&g_mu); - g_writing_thread->Join(); - grpc_core::Delete(g_writing_thread); + pthread_join(g_writing_thread, NULL); gpr_log(GPR_INFO, "flushing logs"); @@ -202,8 +201,12 @@ void gpr_timers_set_log_filename(const char* filename) { } static void init_output() { - g_writing_thread = grpc_core::New<grpc_core::Thread>("timer_output_thread", - writing_thread, nullptr); + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_create(&g_writing_thread, &attr, &writing_thread, NULL); + pthread_attr_destroy(&attr); + atexit(finish_writing); } diff --git a/src/core/lib/profiling/timers.h b/src/core/lib/profiling/timers.h index d0188b5054..7ff72783ec 100644 --- a/src/core/lib/profiling/timers.h +++ b/src/core/lib/profiling/timers.h @@ -82,9 +82,12 @@ class ProfileScope { }; } // namespace grpc -#define GPR_TIMER_SCOPE(tag, important) \ - ::grpc::ProfileScope _profile_scope_##__LINE__((tag), (important), __FILE__, \ - __LINE__) +#define GPR_TIMER_SCOPE_NAME_INTERNAL(prefix, line) prefix##line +#define GPR_TIMER_SCOPE_NAME(prefix, line) \ + GPR_TIMER_SCOPE_NAME_INTERNAL(prefix, line) +#define GPR_TIMER_SCOPE(tag, important) \ + ::grpc::ProfileScope GPR_TIMER_SCOPE_NAME(_profile_scope_, __LINE__)( \ + (tag), (important), __FILE__, __LINE__) #endif /* at least one profiler requested. */ diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index cbe77d5a69..3967112bb8 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -965,63 +965,6 @@ static grpc_security_connector_vtable ssl_channel_vtable = { static grpc_security_connector_vtable ssl_server_vtable = { ssl_server_destroy, ssl_server_check_peer, ssl_server_cmp}; -/* returns a NULL terminated slice. */ -static grpc_slice compute_default_pem_root_certs_once(void) { - grpc_slice result = grpc_empty_slice(); - - /* First try to load the roots from the environment. */ - char* default_root_certs_path = - gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR); - if (default_root_certs_path != nullptr) { - GRPC_LOG_IF_ERROR("load_file", - grpc_load_file(default_root_certs_path, 1, &result)); - gpr_free(default_root_certs_path); - } - - /* Try overridden roots if needed. */ - grpc_ssl_roots_override_result ovrd_res = GRPC_SSL_ROOTS_OVERRIDE_FAIL; - if (GRPC_SLICE_IS_EMPTY(result) && ssl_roots_override_cb != nullptr) { - char* pem_root_certs = nullptr; - ovrd_res = ssl_roots_override_cb(&pem_root_certs); - if (ovrd_res == GRPC_SSL_ROOTS_OVERRIDE_OK) { - GPR_ASSERT(pem_root_certs != nullptr); - result = grpc_slice_from_copied_buffer( - pem_root_certs, - strlen(pem_root_certs) + 1); // NULL terminator. - } - gpr_free(pem_root_certs); - } - - /* Fall back to installed certs if needed. */ - if (GRPC_SLICE_IS_EMPTY(result) && - ovrd_res != GRPC_SSL_ROOTS_OVERRIDE_FAIL_PERMANENTLY) { - GRPC_LOG_IF_ERROR("load_file", - grpc_load_file(installed_roots_path, 1, &result)); - } - return result; -} - -static grpc_slice default_pem_root_certs; - -static void init_default_pem_root_certs(void) { - default_pem_root_certs = compute_default_pem_root_certs_once(); -} - -grpc_slice grpc_get_default_ssl_roots_for_testing(void) { - return compute_default_pem_root_certs_once(); -} - -const char* grpc_get_default_ssl_roots(void) { - /* TODO(jboeuf@google.com): Maybe revisit the approach which consists in - loading all the roots once for the lifetime of the process. */ - static gpr_once once = GPR_ONCE_INIT; - gpr_once_init(&once, init_default_pem_root_certs); - return GRPC_SLICE_IS_EMPTY(default_pem_root_certs) - ? nullptr - : reinterpret_cast<const char*> - GRPC_SLICE_START_PTR(default_pem_root_certs); -} - grpc_security_status grpc_ssl_channel_security_connector_create( grpc_channel_credentials* channel_creds, grpc_call_credentials* request_metadata_creds, @@ -1043,7 +986,9 @@ grpc_security_status grpc_ssl_channel_security_connector_create( goto error; } if (config->pem_root_certs == nullptr) { - options.pem_root_certs = grpc_get_default_ssl_roots(); + // Use default root certificates. + options.pem_root_certs = grpc_core::DefaultSslRootStore::GetPemRootCerts(); + options.root_store = grpc_core::DefaultSslRootStore::GetRootStore(); if (options.pem_root_certs == nullptr) { gpr_log(GPR_ERROR, "Could not get default pem root certs."); goto error; @@ -1051,7 +996,6 @@ grpc_security_status grpc_ssl_channel_security_connector_create( } else { options.pem_root_certs = config->pem_root_certs; } - c = static_cast<grpc_ssl_channel_security_connector*>( gpr_zalloc(sizeof(grpc_ssl_channel_security_connector))); @@ -1157,3 +1101,79 @@ grpc_security_status grpc_ssl_server_security_connector_create( } return retval; } + +namespace grpc_core { + +tsi_ssl_root_certs_store* DefaultSslRootStore::default_root_store_; +grpc_slice DefaultSslRootStore::default_pem_root_certs_; + +const tsi_ssl_root_certs_store* DefaultSslRootStore::GetRootStore() { + InitRootStore(); + return default_root_store_; +} + +const char* DefaultSslRootStore::GetPemRootCerts() { + InitRootStore(); + return GRPC_SLICE_IS_EMPTY(default_pem_root_certs_) + ? nullptr + : reinterpret_cast<const char*> + GRPC_SLICE_START_PTR(default_pem_root_certs_); +} + +void DefaultSslRootStore::Initialize() { + default_root_store_ = nullptr; + default_pem_root_certs_ = grpc_empty_slice(); +} + +void DefaultSslRootStore::Destroy() { + tsi_ssl_root_certs_store_destroy(default_root_store_); + grpc_slice_unref_internal(default_pem_root_certs_); +} + +grpc_slice DefaultSslRootStore::ComputePemRootCerts() { + grpc_slice result = grpc_empty_slice(); + // First try to load the roots from the environment. + char* default_root_certs_path = + gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR); + if (default_root_certs_path != nullptr) { + GRPC_LOG_IF_ERROR("load_file", + grpc_load_file(default_root_certs_path, 1, &result)); + gpr_free(default_root_certs_path); + } + // Try overridden roots if needed. + grpc_ssl_roots_override_result ovrd_res = GRPC_SSL_ROOTS_OVERRIDE_FAIL; + if (GRPC_SLICE_IS_EMPTY(result) && ssl_roots_override_cb != nullptr) { + char* pem_root_certs = nullptr; + ovrd_res = ssl_roots_override_cb(&pem_root_certs); + if (ovrd_res == GRPC_SSL_ROOTS_OVERRIDE_OK) { + GPR_ASSERT(pem_root_certs != nullptr); + result = grpc_slice_from_copied_buffer( + pem_root_certs, + strlen(pem_root_certs) + 1); // nullptr terminator. + } + gpr_free(pem_root_certs); + } + // Fall back to installed certs if needed. + if (GRPC_SLICE_IS_EMPTY(result) && + ovrd_res != GRPC_SSL_ROOTS_OVERRIDE_FAIL_PERMANENTLY) { + GRPC_LOG_IF_ERROR("load_file", + grpc_load_file(installed_roots_path, 1, &result)); + } + return result; +} + +void DefaultSslRootStore::InitRootStore() { + static gpr_once once = GPR_ONCE_INIT; + gpr_once_init(&once, DefaultSslRootStore::InitRootStoreOnce); +} + +void DefaultSslRootStore::InitRootStoreOnce() { + default_pem_root_certs_ = ComputePemRootCerts(); + if (!GRPC_SLICE_IS_EMPTY(default_pem_root_certs_)) { + default_root_store_ = + tsi_ssl_root_certs_store_create(reinterpret_cast<const char*>( + GRPC_SLICE_START_PTR(default_pem_root_certs_))); + } +} + +} // namespace grpc_core diff --git a/src/core/lib/security/security_connector/security_connector.h b/src/core/lib/security/security_connector/security_connector.h index dc847d94f9..5d3d1e0f44 100644 --- a/src/core/lib/security/security_connector/security_connector.h +++ b/src/core/lib/security/security_connector/security_connector.h @@ -216,12 +216,6 @@ grpc_security_status grpc_ssl_channel_security_connector_create( tsi_ssl_session_cache* ssl_session_cache, grpc_channel_security_connector** sc); -/* Gets the default ssl roots. Returns NULL if not found. */ -const char* grpc_get_default_ssl_roots(void); - -/* Exposed for TESTING ONLY!. */ -grpc_slice grpc_get_default_ssl_roots_for_testing(void); - /* Config for ssl servers. */ typedef struct { tsi_ssl_pem_key_cert_pair* pem_key_cert_pairs; @@ -250,4 +244,49 @@ tsi_peer tsi_shallow_peer_from_ssl_auth_context( const grpc_auth_context* auth_context); void tsi_shallow_peer_destruct(tsi_peer* peer); +/* --- Default SSL Root Store. --- */ +namespace grpc_core { + +// The class implements default SSL root store. +class DefaultSslRootStore { + public: + // Gets the default SSL root store. Returns nullptr if not found. + static const tsi_ssl_root_certs_store* GetRootStore(); + + // Gets the default PEM root certificate. + static const char* GetPemRootCerts(); + + // Initializes the SSL root store's underlying data structure. It does not + // load default SSL root certificates. Should only be called by + // grpc_security_init(). + static void Initialize(); + + // Destroys the default SSL root store. Should only be called by + // grpc_security_shutdown(). + static void Destroy(); + + protected: + // Returns default PEM root certificates in nullptr terminated grpc_slice. + // This function is protected instead of private, so that it can be tested. + static grpc_slice ComputePemRootCerts(); + + private: + // Construct me not! + DefaultSslRootStore(); + + // Initialization of default SSL root store. + static void InitRootStore(); + + // One-time initialization of default SSL root store. + static void InitRootStoreOnce(); + + // SSL root store in tsi_ssl_root_certs_store object. + static tsi_ssl_root_certs_store* default_root_store_; + + // Default PEM root certificates. + static grpc_slice default_pem_root_certs_; +}; + +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_SECURITY_SECURITY_CONNECTOR_SECURITY_CONNECTOR_H */ diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 03353d6beb..cecc15b2df 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -21,6 +21,7 @@ #include "src/core/lib/surface/channel.h" #include <inttypes.h> +#include <limits.h> #include <stdlib.h> #include <string.h> @@ -30,8 +31,12 @@ #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" @@ -62,6 +67,8 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call* registered_calls; + grpc_core::RefCountedPtr<grpc_core::ChannelTrace> tracer; + char* target; }; @@ -93,12 +100,14 @@ grpc_channel* grpc_channel_create_with_builder( grpc_error_string(error)); GRPC_ERROR_UNREF(error); gpr_free(target); - goto done; + grpc_channel_args_destroy(args); + return channel; } memset(channel, 0, sizeof(*channel)); channel->target = target; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); + size_t channel_tracer_max_nodes = 0; // default to off gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = nullptr; @@ -161,14 +170,33 @@ grpc_channel* grpc_channel_create_with_builder( channel->compression_options.enabled_algorithms_bitset = static_cast<uint32_t>(args->args[i].value.integer) | 0x1; /* always support no compression */ + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE)) { + GPR_ASSERT(channel_tracer_max_nodes == 0); + // max_nodes defaults to 0 (which is off), clamped between 0 and INT_MAX + const grpc_integer_options options = {0, 0, INT_MAX}; + channel_tracer_max_nodes = + (size_t)grpc_channel_arg_get_integer(&args->args[i], options); } } -done: grpc_channel_args_destroy(args); + channel->tracer = grpc_core::MakeRefCounted<grpc_core::ChannelTrace>( + channel_tracer_max_nodes); + channel->tracer->AddTraceEvent( + grpc_core::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Channel created")); return channel; } +char* grpc_channel_get_trace(grpc_channel* channel) { + return channel->tracer->RenderTrace(); +} + +intptr_t grpc_channel_get_uuid(grpc_channel* channel) { + return channel->tracer->GetUuid(); +} + grpc_channel* grpc_channel_create(const char* target, const grpc_channel_args* input_args, grpc_channel_stack_type channel_stack_type, @@ -377,6 +405,7 @@ static void destroy_channel(void* arg, grpc_error* error) { GRPC_MDELEM_UNREF(rc->authority); gpr_free(rc); } + channel->tracer.reset(); GRPC_MDELEM_UNREF(channel->default_authority); gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel->target); diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index ac9f9e6066..52e0ee1c44 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -27,6 +27,7 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_trace_registry.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/debug/stats.h" @@ -128,6 +129,7 @@ void grpc_init(void) { grpc_slice_intern_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); + grpc_channel_trace_registry_init(); grpc_security_pre_init(); grpc_core::ExecCtx::GlobalInit(); grpc_iomgr_init(); @@ -170,12 +172,14 @@ void grpc_shutdown(void) { } } } + grpc_security_shutdown(); grpc_iomgr_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); grpc_mdctx_global_shutdown(); grpc_handshaker_factory_registry_shutdown(); grpc_slice_intern_shutdown(); + grpc_channel_trace_registry_shutdown(); grpc_stats_shutdown(); } grpc_core::ExecCtx::GlobalShutdown(); diff --git a/src/core/lib/surface/init.h b/src/core/lib/surface/init.h index 9353208332..d8282b475b 100644 --- a/src/core/lib/surface/init.h +++ b/src/core/lib/surface/init.h @@ -22,6 +22,7 @@ void grpc_register_security_filters(void); void grpc_security_pre_init(void); void grpc_security_init(void); +void grpc_security_shutdown(void); int grpc_is_initialized(void); #endif /* GRPC_CORE_LIB_SURFACE_INIT_H */ diff --git a/src/core/lib/surface/init_secure.cc b/src/core/lib/surface/init_secure.cc index 78e983e0cd..3e4f1a8829 100644 --- a/src/core/lib/surface/init_secure.cc +++ b/src/core/lib/surface/init_secure.cc @@ -75,4 +75,9 @@ void grpc_register_security_filters(void) { maybe_prepend_server_auth_filter, nullptr); } -void grpc_security_init() { grpc_security_register_handshaker_factories(); } +void grpc_security_init() { + grpc_security_register_handshaker_factories(); + grpc_core::DefaultSslRootStore::Initialize(); +} + +void grpc_security_shutdown() { grpc_core::DefaultSslRootStore::Destroy(); } diff --git a/src/core/lib/surface/init_unsecure.cc b/src/core/lib/surface/init_unsecure.cc index 2b3bc64382..1c8d07b38b 100644 --- a/src/core/lib/surface/init_unsecure.cc +++ b/src/core/lib/surface/init_unsecure.cc @@ -25,3 +25,5 @@ void grpc_security_pre_init(void) {} void grpc_register_security_filters(void) {} void grpc_security_init(void) {} + +void grpc_security_shutdown(void) {} diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.cc b/src/core/plugin_registry/grpc_cronet_plugin_registry.cc index 49b9c7d4fe..84228cb38a 100644 --- a/src/core/plugin_registry/grpc_cronet_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.cc @@ -20,30 +20,50 @@ #include <grpc/grpc.h> -void grpc_http_filters_init(void); -void grpc_http_filters_shutdown(void); -void grpc_chttp2_plugin_init(void); -void grpc_chttp2_plugin_shutdown(void); void grpc_deadline_filter_init(void); void grpc_deadline_filter_shutdown(void); void grpc_client_channel_init(void); void grpc_client_channel_shutdown(void); -void grpc_tsi_alts_init(void); -void grpc_tsi_alts_shutdown(void); +void grpc_lb_policy_pick_first_init(void); +void grpc_lb_policy_pick_first_shutdown(void); +void grpc_max_age_filter_init(void); +void grpc_max_age_filter_shutdown(void); +void grpc_message_size_filter_init(void); +void grpc_message_size_filter_shutdown(void); +void grpc_resolver_dns_native_init(void); +void grpc_resolver_dns_native_shutdown(void); +void grpc_resolver_sockaddr_init(void); +void grpc_resolver_sockaddr_shutdown(void); void grpc_server_load_reporting_plugin_init(void); void grpc_server_load_reporting_plugin_shutdown(void); +void grpc_http_filters_init(void); +void grpc_http_filters_shutdown(void); +void grpc_chttp2_plugin_init(void); +void grpc_chttp2_plugin_shutdown(void); +void grpc_tsi_alts_init(void); +void grpc_tsi_alts_shutdown(void); void grpc_register_built_in_plugins(void) { - grpc_register_plugin(grpc_http_filters_init, - grpc_http_filters_shutdown); - grpc_register_plugin(grpc_chttp2_plugin_init, - grpc_chttp2_plugin_shutdown); grpc_register_plugin(grpc_deadline_filter_init, grpc_deadline_filter_shutdown); grpc_register_plugin(grpc_client_channel_init, grpc_client_channel_shutdown); - grpc_register_plugin(grpc_tsi_alts_init, - grpc_tsi_alts_shutdown); + grpc_register_plugin(grpc_lb_policy_pick_first_init, + grpc_lb_policy_pick_first_shutdown); + grpc_register_plugin(grpc_max_age_filter_init, + grpc_max_age_filter_shutdown); + grpc_register_plugin(grpc_message_size_filter_init, + grpc_message_size_filter_shutdown); + grpc_register_plugin(grpc_resolver_dns_native_init, + grpc_resolver_dns_native_shutdown); + grpc_register_plugin(grpc_resolver_sockaddr_init, + grpc_resolver_sockaddr_shutdown); grpc_register_plugin(grpc_server_load_reporting_plugin_init, grpc_server_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_http_filters_init, + grpc_http_filters_shutdown); + grpc_register_plugin(grpc_chttp2_plugin_init, + grpc_chttp2_plugin_shutdown); + grpc_register_plugin(grpc_tsi_alts_init, + grpc_tsi_alts_shutdown); } diff --git a/src/core/tsi/ssl_transport_security.cc b/src/core/tsi/ssl_transport_security.cc index 0fc2926cf6..0ba6587678 100644 --- a/src/core/tsi/ssl_transport_security.cc +++ b/src/core/tsi/ssl_transport_security.cc @@ -71,6 +71,10 @@ extern "C" { /* --- Structure definitions. ---*/ +struct tsi_ssl_root_certs_store { + X509_STORE* store; +}; + struct tsi_ssl_handshaker_factory { const tsi_ssl_handshaker_factory_vtable* vtable; gpr_refcount refcount; @@ -553,21 +557,18 @@ static tsi_result ssl_ctx_use_private_key(SSL_CTX* context, const char* pem_key, /* Loads in-memory PEM verification certs into the SSL context and optionally returns the verification cert names (root_names can be NULL). */ -static tsi_result ssl_ctx_load_verification_certs(SSL_CTX* context, - const char* pem_roots, - size_t pem_roots_size, - STACK_OF(X509_NAME) * - *root_names) { +static tsi_result x509_store_load_certs(X509_STORE* cert_store, + const char* pem_roots, + size_t pem_roots_size, + STACK_OF(X509_NAME) * *root_names) { tsi_result result = TSI_OK; size_t num_roots = 0; X509* root = nullptr; X509_NAME* root_name = nullptr; BIO* pem; - X509_STORE* root_store; GPR_ASSERT(pem_roots_size <= INT_MAX); pem = BIO_new_mem_buf((void*)pem_roots, static_cast<int>(pem_roots_size)); - root_store = SSL_CTX_get_cert_store(context); - if (root_store == nullptr) return TSI_INVALID_ARGUMENT; + if (cert_store == nullptr) return TSI_INVALID_ARGUMENT; if (pem == nullptr) return TSI_OUT_OF_RESOURCES; if (root_names != nullptr) { *root_names = sk_X509_NAME_new_null(); @@ -595,7 +596,7 @@ static tsi_result ssl_ctx_load_verification_certs(SSL_CTX* context, sk_X509_NAME_push(*root_names, root_name); root_name = nullptr; } - if (!X509_STORE_add_cert(root_store, root)) { + if (!X509_STORE_add_cert(cert_store, root)) { gpr_log(GPR_ERROR, "Could not add root certificate to ssl context."); result = TSI_INTERNAL_ERROR; break; @@ -621,6 +622,16 @@ static tsi_result ssl_ctx_load_verification_certs(SSL_CTX* context, return result; } +static tsi_result ssl_ctx_load_verification_certs(SSL_CTX* context, + const char* pem_roots, + size_t pem_roots_size, + STACK_OF(X509_NAME) * + *root_name) { + X509_STORE* cert_store = SSL_CTX_get_cert_store(context); + return x509_store_load_certs(cert_store, pem_roots, pem_roots_size, + root_name); +} + /* Populates the SSL context with a private key and a cert chain, and sets the cipher list and the ephemeral ECDH key. */ static tsi_result populate_ssl_context( @@ -730,6 +741,43 @@ static int NullVerifyCallback(int preverify_ok, X509_STORE_CTX* ctx) { return 1; } +/* --- tsi_ssl_root_certs_store methods implementation. ---*/ + +tsi_ssl_root_certs_store* tsi_ssl_root_certs_store_create( + const char* pem_roots) { + if (pem_roots == nullptr) { + gpr_log(GPR_ERROR, "The root certificates are empty."); + return nullptr; + } + tsi_ssl_root_certs_store* root_store = static_cast<tsi_ssl_root_certs_store*>( + gpr_zalloc(sizeof(tsi_ssl_root_certs_store))); + if (root_store == nullptr) { + gpr_log(GPR_ERROR, "Could not allocate buffer for ssl_root_certs_store."); + return nullptr; + } + root_store->store = X509_STORE_new(); + if (root_store->store == nullptr) { + gpr_log(GPR_ERROR, "Could not allocate buffer for X509_STORE."); + gpr_free(root_store); + return nullptr; + } + tsi_result result = x509_store_load_certs(root_store->store, pem_roots, + strlen(pem_roots), nullptr); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Could not load root certificates."); + X509_STORE_free(root_store->store); + gpr_free(root_store); + return nullptr; + } + return root_store; +} + +void tsi_ssl_root_certs_store_destroy(tsi_ssl_root_certs_store* self) { + if (self == nullptr) return; + X509_STORE_free(self->store); + gpr_free(self); +} + /* --- tsi_ssl_session_cache methods implementation. ---*/ tsi_ssl_session_cache* tsi_ssl_session_cache_create_lru(size_t capacity) { @@ -1468,7 +1516,9 @@ tsi_result tsi_create_ssl_client_handshaker_factory_with_options( if (factory == nullptr) return TSI_INVALID_ARGUMENT; *factory = nullptr; - if (options->pem_root_certs == nullptr) return TSI_INVALID_ARGUMENT; + if (options->pem_root_certs == nullptr && options->root_store == nullptr) { + return TSI_INVALID_ARGUMENT; + } ssl_context = SSL_CTX_new(TLSv1_2_method()); if (ssl_context == nullptr) { @@ -1480,9 +1530,7 @@ tsi_result tsi_create_ssl_client_handshaker_factory_with_options( gpr_zalloc(sizeof(*impl))); tsi_ssl_handshaker_factory_init(&impl->base); impl->base.vtable = &client_handshaker_factory_vtable; - impl->ssl_context = ssl_context; - if (options->session_cache != nullptr) { // Unref is called manually on factory destruction. impl->session_cache = @@ -1498,12 +1546,22 @@ tsi_result tsi_create_ssl_client_handshaker_factory_with_options( result = populate_ssl_context(ssl_context, options->pem_key_cert_pair, options->cipher_suites); if (result != TSI_OK) break; - result = ssl_ctx_load_verification_certs( - ssl_context, options->pem_root_certs, strlen(options->pem_root_certs), - nullptr); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Cannot load server root certificates."); - break; + +#if OPENSSL_VERSION_NUMBER >= 0x10100000 + // X509_STORE_up_ref is only available since OpenSSL 1.1. + if (options->root_store != nullptr) { + X509_STORE_up_ref(options->root_store->store); + SSL_CTX_set_cert_store(ssl_context, options->root_store->store); + } +#endif + if (OPENSSL_VERSION_NUMBER < 0x10100000 || options->root_store == nullptr) { + result = ssl_ctx_load_verification_certs( + ssl_context, options->pem_root_certs, strlen(options->pem_root_certs), + nullptr); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Cannot load server root certificates."); + break; + } } if (options->num_alpn_protocols != 0) { diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h index 29d209b8f5..cabf583098 100644 --- a/src/core/tsi/ssl_transport_security.h +++ b/src/core/tsi/ssl_transport_security.h @@ -36,6 +36,20 @@ #define TSI_SSL_ALPN_SELECTED_PROTOCOL "ssl_alpn_selected_protocol" +/* --- tsi_ssl_root_certs_store object --- + + This object stores SSL root certificates. It can be shared by multiple SSL + context. */ +typedef struct tsi_ssl_root_certs_store tsi_ssl_root_certs_store; + +/* Given a NULL-terminated string containing the PEM encoding of the root + certificates, creates a tsi_ssl_root_certs_store object. */ +tsi_ssl_root_certs_store* tsi_ssl_root_certs_store_create( + const char* pem_roots); + +/* Destroys the tsi_ssl_root_certs_store object. */ +void tsi_ssl_root_certs_store_destroy(tsi_ssl_root_certs_store* self); + /* --- tsi_ssl_session_cache object --- Cache for SSL sessions for sessions resumption. */ @@ -70,13 +84,13 @@ typedef struct { const char* cert_chain; } tsi_ssl_pem_key_cert_pair; -/* Creates a client handshaker factory. +/* TO BE DEPRECATED. + Creates a client handshaker factory. - pem_key_cert_pair is a pointer to the object containing client's private key and certificate chain. This parameter can be NULL if the client does not have such a key/cert pair. - pem_roots_cert is the NULL-terminated string containing the PEM encoding of - the client root certificates. This parameter may be NULL if the server does - not want the client to be authenticated with SSL. + the server root certificates. - cipher_suites contains an optional list of the ciphers that the client supports. The format of this string is described in: https://www.openssl.org/docs/apps/ciphers.html. @@ -103,9 +117,13 @@ typedef struct { not have such a key/cert pair. */ const tsi_ssl_pem_key_cert_pair* pem_key_cert_pair; /* pem_roots_cert is the NULL-terminated string containing the PEM encoding of - the client root certificates. This parameter may be NULL if the server does - not want the client to be authenticated with SSL. */ + the client root certificates. */ const char* pem_root_certs; + /* root_store is a pointer to the ssl_root_certs_store object. If root_store + is not nullptr and SSL implementation permits, root_store will be used as + root certificates. Otherwise, pem_roots_cert will be used to load server + root certificates. */ + const tsi_ssl_root_certs_store* root_store; /* cipher_suites contains an optional list of the ciphers that the client supports. The format of this string is described in: https://www.openssl.org/docs/apps/ciphers.html. @@ -160,12 +178,14 @@ void tsi_ssl_client_handshaker_factory_unref( typedef struct tsi_ssl_server_handshaker_factory tsi_ssl_server_handshaker_factory; -/* Creates a server handshaker factory. +/* TO BE DEPRECATED. + Creates a server handshaker factory. - pem_key_cert_pairs is an array private key / certificate chains of the server. - num_key_cert_pairs is the number of items in the pem_key_cert_pairs array. - pem_root_certs is the NULL-terminated string containing the PEM encoding - of the server root certificates. + of the client root certificates. This parameter may be NULL if the server + does not want the client to be authenticated with SSL. - cipher_suites contains an optional list of the ciphers that the server supports. The format of this string is described in: https://www.openssl.org/docs/apps/ciphers.html. @@ -187,7 +207,8 @@ tsi_result tsi_create_ssl_server_handshaker_factory( const char** alpn_protocols, uint16_t num_alpn_protocols, tsi_ssl_server_handshaker_factory** factory); -/* Same as tsi_create_ssl_server_handshaker_factory method except uses +/* TO BE DEPRECATED. + Same as tsi_create_ssl_server_handshaker_factory method except uses tsi_client_certificate_request_type to support more ways to handle client certificate authentication. - client_certificate_request, if set to non-zero will force the client to @@ -208,7 +229,8 @@ typedef struct { array. */ size_t num_key_cert_pairs; /* pem_root_certs is the NULL-terminated string containing the PEM encoding - of the server root certificates. */ + of the server root certificates. This parameter may be NULL if the server + does not want the client to be authenticated with SSL. */ const char* pem_client_root_certs; /* client_certificate_request, if set to non-zero will force the client to authenticate with an SSL cert. Note that this option is ignored if diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 760aaa4b4d..391ca44962 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -45,6 +45,7 @@ #include "src/cpp/thread_manager/thread_manager.h" namespace grpc { +namespace { class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: @@ -53,16 +54,29 @@ class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { void PostSynchronousRequest(ServerContext* context) override {} }; -static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; -static gpr_once g_once_init_callbacks = GPR_ONCE_INIT; +std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; +gpr_once g_once_init_callbacks = GPR_ONCE_INIT; -static void InitGlobalCallbacks() { +void InitGlobalCallbacks() { if (!g_callbacks) { g_callbacks.reset(new DefaultGlobalCallbacks()); } } -class Server::UnimplementedAsyncRequestContext { +class ShutdownTag : public internal::CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { return false; } +}; + +class DummyTag : public internal::CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { + *status = true; + return true; + } +}; + +class UnimplementedAsyncRequestContext { protected: UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} @@ -70,8 +84,14 @@ class Server::UnimplementedAsyncRequestContext { GenericServerAsyncReaderWriter generic_stream_; }; +} // namespace + +/// Use private inheritance rather than composition only to establish order +/// of construction, since the public base class should be constructed after the +/// elements belonging to the private base class are constructed. This is not +/// possible using true composition. class Server::UnimplementedAsyncRequest final - : public UnimplementedAsyncRequestContext, + : private UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) @@ -90,38 +110,27 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, - internal::CallOpServerSendStatus> - UnimplementedAsyncResponseOp; +/// UnimplementedAsyncResponse should not post user-visible completions to the +/// C++ completion queue, but is generated as a CQ event by the core class Server::UnimplementedAsyncResponse final - : public UnimplementedAsyncResponseOp { + : public internal::CallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus> { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + internal::CallOpSet< + internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus>::FinalizeResult(tag, status); delete this; - return r; + return false; } private: UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public internal::CompletionQueueTag { - public: - bool FinalizeResult(void** tag, bool* status) { return false; } -}; - -class DummyTag : public internal::CompletionQueueTag { - public: - bool FinalizeResult(void** tag, bool* status) { - *status = true; - return true; - } -}; - class Server::SyncRequest final : public internal::CompletionQueueTag { public: SyncRequest(internal::RpcServiceMethod* method, void* tag) diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index e7b30cd1e9..abe19a6fc6 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -160,8 +160,17 @@ namespace Grpc.Core var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; lock (myLock) { - // pass "tcs" as "state" for WatchConnectivityStateHandler. - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs); + if (handle.IsClosed) + { + // If channel has been already shutdown and handle was disposed, we would end up with + // an abandoned completion added to the completion registry. Instead, we make sure we fail early. + throw new ObjectDisposedException(nameof(handle), "Channel handle has already been disposed."); + } + else + { + // pass "tcs" as "state" for WatchConnectivityStateHandler. + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs); + } } return tcs.Task; } diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 9e9db1fe6c..6e17d9a7cb 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -8,7 +8,6 @@ GRPC_LOCAL_SRC = '../../..' # Install the dependencies in the main target plus all test targets. %w( - Tests AllTests RxLibraryUnitTests InteropTestsRemote @@ -44,10 +43,8 @@ end target target_name do pod 'BoringSSL', :podspec => "#{GRPC_LOCAL_SRC}/src/objective-c", :inhibit_warnings => true pod 'CronetFramework', :podspec => "#{GRPC_LOCAL_SRC}/src/objective-c" - pod 'gRPC-Core', :path => GRPC_LOCAL_SRC - pod 'gRPC-Core/Cronet-Interface', :path => GRPC_LOCAL_SRC pod 'gRPC-Core/Cronet-Implementation', :path => GRPC_LOCAL_SRC - pod 'gRPC-Core/Tests', :path => GRPC_LOCAL_SRC + pod 'gRPC-Core/Cronet-Tests', :path => GRPC_LOCAL_SRC end end diff --git a/src/proto/grpc/channelz/BUILD b/src/proto/grpc/channelz/BUILD new file mode 100644 index 0000000000..bdb03d5e2d --- /dev/null +++ b/src/proto/grpc/channelz/BUILD @@ -0,0 +1,26 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +licenses(["notice"]) # Apache v2 + +load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package") + +grpc_package(name = "channelz", visibility = "public") + +grpc_proto_library( + name = "channelz_proto", + srcs = ["channelz.proto"], + has_services = True, + well_known_protos = True, +) diff --git a/src/proto/grpc/channelz/channelz.proto b/src/proto/grpc/channelz/channelz.proto new file mode 100644 index 0000000000..14db66a654 --- /dev/null +++ b/src/proto/grpc/channelz/channelz.proto @@ -0,0 +1,456 @@ +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.channelz; + +import "google/protobuf/any.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; + +// See go/grpc-channelz. + +// Channel is a logical grouping of channels, subchannels, and sockets. +message Channel { + // The identifier for this channel. + ChannelRef ref = 1; + // Data specific to this channel. + ChannelData data = 2; + // At most one of 'channel_ref+subchannel_ref' and 'socket' is set. + + // There are no ordering guarantees on the order of channel refs. + // There may not be cycles in the ref graph. + // A channel ref may be present in more than one channel or subchannel. + repeated ChannelRef channel_ref = 3; + + // At most one of 'channel_ref+subchannel_ref' and 'socket' is set. + // There are no ordering guarantees on the order of subchannel refs. + // There may not be cycles in the ref graph. + // A sub channel ref may be present in more than one channel or subchannel. + repeated SubchannelRef subchannel_ref = 4; + + // There are no ordering guarantees on the order of sockets. + repeated SocketRef socket = 5; +} + +// Subchannel is a logical grouping of channels, subchannels, and sockets. +// A subchannel is load balanced over by it's ancestor +message Subchannel { + // The identifier for this channel. + SubchannelRef ref = 1; + // Data specific to this channel. + ChannelData data = 2; + // At most one of 'channel_ref+subchannel_ref' and 'socket' is set. + + // There are no ordering guarantees on the order of channel refs. + // There may not be cycles in the ref graph. + // A channel ref may be present in more than one channel or subchannel. + repeated ChannelRef channel_ref = 3; + + // At most one of 'channel_ref+subchannel_ref' and 'socket' is set. + // There are no ordering guarantees on the order of subchannel refs. + // There may not be cycles in the ref graph. + // A sub channel ref may be present in more than one channel or subchannel. + repeated SubchannelRef subchannel_ref = 4; + + // There are no ordering guarantees on the order of sockets. + repeated SocketRef socket = 5; +} + +// These come from the specified states in this document: +// https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md +message ChannelConnectivityState { + enum State { + UNKNOWN = 0; + IDLE = 1; + CONNECTING = 2; + READY = 3; + TRANSIENT_FAILURE = 4; + SHUTDOWN = 5; + } + State state = 1; +} + +message ChannelData { + + ChannelConnectivityState state = 1; + + // The target this channel originally tried to connect to. May be absent + string target = 2; + + ChannelTrace trace = 3; + + // The number of calls started on the channel + int64 calls_started = 4; + // The number of calls that have completed with an OK status + int64 calls_succeeded = 5; + // The number of calls that have a completed with a non-OK status + int64 calls_failed = 6; + + // The last time a call was started on the channel. + google.protobuf.Timestamp last_call_started_timestamp = 7; +} + +// A trace event is an interesting thing that happened to a channel or +// subchannel, such as creation, address resolution, subchannel creation, etc. +message ChannelTraceEvent { + // High level description of the event. + string description = 1; + // The supported severity levels of trace events. + enum Severity { + CT_UNKNOWN = 0; + CT_INFO = 1; + CT_WARNING = 2; + CT_ERROR = 3; + } + // the severity of the trace event + Severity severity = 2; + // When this event occurred. + google.protobuf.Timestamp timestamp = 3; + // ref of referenced channel or subchannel. + // Optional, only present if this event refers to a child object. For example, + // this field would be filled if this trace event was for a subchannel being + // created. + oneof child_ref { + ChannelRef channel_ref = 4; + SubchannelRef subchannel_ref = 5; + } +} + +message ChannelTrace { + // Number of events ever logged in this tracing object. This can differ from + // events.size() because events can be overwritten or garbage collected by + // implementations. + int64 num_events_logged = 1; + // Time that this channel was created. + google.protobuf.Timestamp creation_time = 2; + // List of events that have occurred on this channel. + repeated ChannelTraceEvent events = 3; +} + +message ChannelRef { + // The globally unique id for this channel. Must be a positive number. + int64 channel_id = 1; + // An optional name associated with the channel. + string name = 2; + // Intentionally don't use field numbers from other refs. + reserved 3, 4, 5, 6; +} + +message SubchannelRef { + // The globally unique id for this subchannel. Must be a positive number. + int64 subchannel_id = 7; + // An optional name associated with the subchannel. + string name = 8; + // Intentionally don't use field numbers from other refs. + reserved 1, 2, 3, 4, 5, 6; +} + +message SocketRef { + int64 socket_id = 3; + // An optional name associated with the socket. + string name = 4; + // Intentionally don't use field numbers from other refs. + reserved 1, 2, 5, 6, 7, 8; +} + +message ServerRef { + // A globally unique identifier for this server. Must be a positive number. + int64 server_id = 5; + // An optional name associated with the server. + string name = 6; + // Intentionally don't use field numbers from other refs. + reserved 1, 2, 3, 4, 7, 8; +} + +message Server { + ServerRef ref = 1; + ServerData data = 2; + + // The sockets that the server is listening on. There are no ordering + // guarantees. + repeated SocketRef listen_socket = 3; +} + +message ServerData { + ChannelTrace trace = 1; + + // The number of incoming calls started on the server + int64 calls_started = 2; + // The number of incoming calls that have completed with an OK status + int64 calls_succeeded = 3; + // The number of incoming calls that have a completed with a non-OK status + int64 calls_failed = 4; + + // The last time a call was started on the server. + google.protobuf.Timestamp last_call_started_timestamp = 5; +} + +// Information about an actual connection. Pronounced "sock-ay". +message Socket { + SocketRef ref = 1; + + SocketData data = 2; + // The locally bound address. + Address local = 3; + // The remote bound address. May be absent. + Address remote = 4; + Security security = 5; + + // Optional, represents the name of the remote endpoint, if different than + // the original target name. + string remote_name = 6; +} + +message SocketData { + // The number of streams that have been started. + int64 streams_started = 1; + // The number of streams that have ended successfully with the EoS bit set for + // both end points + int64 streams_succeeded = 2; + // The number of incoming streams that have a completed with a non-OK status + int64 streams_failed = 3; + + // The number of messages successfully sent on this socket. + int64 messages_sent = 4; + int64 messages_received = 5; + + // The number of keep alives sent. This is typically implemented with HTTP/2 + // ping messages. + int64 keep_alives_sent = 6; + + // The last time a stream was created by this endpoint. Usually unset for + // servers. + google.protobuf.Timestamp last_local_stream_created_timestamp = 7; + // The last time a stream was created by the remote endpoint. Usually unset + // for clients. + google.protobuf.Timestamp last_remote_stream_created_timestamp = 8; + + // The last time a message was sent by this endpoint. + google.protobuf.Timestamp last_message_sent_timestamp = 9; + // The last time a message was received by this endpoint. + google.protobuf.Timestamp last_message_received_timestamp = 10; + + // The amount of window, granted to the local endpoint by the remote endpoint. + // This may be slightly out of date due to network latency. This does NOT + // include stream level or TCP level flow control info. + google.protobuf.Int64Value local_flow_control_window = 11; + + // The amount of window, granted to the remote endpoint by the local endpoint. + // This may be slightly out of date due to network latency. This does NOT + // include stream level or TCP level flow control info. + google.protobuf.Int64Value remote_flow_control_window = 12; + + repeated SocketOption option = 13; +} + +message Address { + message TcpIpAddress { + // Either the IPv4 or IPv6 address in bytes. Will either be 4 bytes or 16 + // bytes in length. + bytes ip_address = 1; + // 0-64k, or -1 if not appropriate. + int32 port = 2; + } + // A Unix Domain Socket address. + message UdsAddress { + string filename = 1; + } + // An address type not included above. + message OtherAddress { + // The human readable version of the value. + string name = 1; + // The actual address message. + google.protobuf.Any value = 2; + } + + oneof address { + TcpIpAddress tcpip_address = 1; + UdsAddress uds_address = 2; + OtherAddress other_address = 3; + } +} + +message Security { + message Tls { + // The key exchange used. e.g. X25519 + string key_exchange = 1; + // The cipher used. e.g. AES_128_GCM. + string cipher = 2; + // the certificate used by this endpoint. + bytes local_certificate = 3; + // the certificate used by the remote endpoint. + bytes remote_certificate = 4; + } + message OtherSecurity { + // The human readable version of the value. + string name = 1; + // The actual security details message. + google.protobuf.Any value = 2; + } + oneof model { + Tls tls = 1; + OtherSecurity other = 2; + } +} + +message SocketOption { + string name = 1; + // The human readable value of this socket option. At least one of value or + // additional will be set. + string value = 2; + // Additional data associated with the socket option. At least one of value + // or additional will be set. + google.protobuf.Any additional = 3; +} + +// For use with SocketOption's additional field. This is primarily used for +// SO_RCVTIMEO and SO_SNDTIMEO +message SocketOptionTimeout { + google.protobuf.Duration duration = 1; +} + +message SocketOptionLinger { + bool active = 1; + google.protobuf.Duration duration = 2; +} + +// Tcp info for SOL_TCP, TCP_INFO +message SocketOptionTcpInfo { + uint32 tcpi_state = 1; + + uint32 tcpi_ca_state = 2; + uint32 tcpi_retransmits = 3; + uint32 tcpi_probes = 4; + uint32 tcpi_backoff = 5; + uint32 tcpi_options = 6; + uint32 tcpi_snd_wscale = 7; + uint32 tcpi_rcv_wscale = 8; + + uint32 tcpi_rto = 9; + uint32 tcpi_ato = 10; + uint32 tcpi_snd_mss = 11; + uint32 tcpi_rcv_mss = 12; + + uint32 tcpi_unacked = 13; + uint32 tcpi_sacked = 14; + uint32 tcpi_lost = 15; + uint32 tcpi_retrans = 16; + uint32 tcpi_fackets = 17; + + uint32 tcpi_last_data_sent = 18; + uint32 tcpi_last_ack_sent = 19; + uint32 tcpi_last_data_recv = 20; + uint32 tcpi_last_ack_recv = 21; + + uint32 tcpi_pmtu = 22; + uint32 tcpi_rcv_ssthresh = 23; + uint32 tcpi_rtt = 24; + uint32 tcpi_rttvar = 25; + uint32 tcpi_snd_ssthresh = 26; + uint32 tcpi_snd_cwnd = 27; + uint32 tcpi_advmss = 28; + uint32 tcpi_reordering = 29; +} + +service Channelz { + // Gets all root channels (e.g. channels the application has directly + // created). This does not include subchannels nor non-top level channels. + rpc GetTopChannels(GetTopChannelsRequest) returns (GetTopChannelsResponse); + // Gets all servers that exist in the process. + rpc GetServers(GetServersRequest) returns (GetServersResponse); + // Gets all server sockets that exist in the process. + rpc GetServerSockets(GetServerSocketsRequest) returns (GetServerSocketsResponse); + // Returns a single Channel, or else a NOT_FOUND code. + rpc GetChannel(GetChannelRequest) returns (GetChannelResponse); + // Returns a single Subchannel, or else a NOT_FOUND code. + rpc GetSubchannel(GetSubchannelRequest) returns (GetSubchannelResponse); + // Returns a single Socket or else a NOT_FOUND code. + rpc GetSocket(GetSocketRequest) returns (GetSocketResponse); +} + +message GetServersRequest { + // start_server_id indicates that only servers at or above this id should be + // included in the results. + int64 start_server_id = 1; +} + +message GetServersResponse { + // list of servers that the connection detail service knows about. Sorted in + // ascending server_id order. + repeated Server server = 1; + // If set, indicates that the list of servers is the final list. Requesting + // more servers will only return more if they are created after this RPC + // completes. + bool end = 2; +} + +message GetServerSocketsRequest { + int64 server_id = 1; + // start_socket_id indicates that only sockets at or above this id should be + // included in the results. + int64 start_socket_id = 2; +} + +message GetServerSocketsResponse { + // list of socket refs that the connection detail service knows about. Sorted in + // ascending socket_id order. + repeated SocketRef socket_ref = 1; + // If set, indicates that the list of sockets is the final list. Requesting + // more sockets will only return more if they are created after this RPC + // completes. + bool end = 2; +} + +message GetTopChannelsRequest { + // start_channel_id indicates that only channels at or above this id should be + // included in the results. + int64 start_channel_id = 1; +} + +message GetTopChannelsResponse { + // list of channels that the connection detail service knows about. Sorted in + // ascending channel_id order. + repeated Channel channel = 1; + // If set, indicates that the list of channels is the final list. Requesting + // more channels can only return more if they are created after this RPC + // completes. + bool end = 2; +} + +message GetChannelRequest { + int64 channel_id = 1; +} + +message GetChannelResponse { + Channel channel = 1; +} + +message GetSubchannelRequest { + int64 subchannel_id = 1; +} + +message GetSubchannelResponse { + Subchannel subchannel = 1; +} + +message GetSocketRequest { + int64 socket_id = 1; +} + +message GetSocketResponse { + Socket socket = 1; +} diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index b8e9a22ab3..58412ed630 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -26,7 +26,7 @@ exports_files([ grpc_proto_library( name = "compiler_test_proto", srcs = ["compiler_test.proto"], - generate_mock = True, + generate_mocks = True, ) grpc_proto_library( @@ -49,7 +49,7 @@ grpc_proto_library( name = "echo_proto", srcs = ["echo.proto"], deps = ["echo_messages_proto"], - generate_mock = True, + generate_mocks = True, ) grpc_proto_library( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 0892215b6d..2e02111ddd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -30,9 +30,12 @@ cdef class Call: tag, operations, self if retain_self else None) batch_operation_tag.prepare() cpython.Py_INCREF(batch_operation_tag) - return grpc_call_start_batch( + cdef grpc_call_error error + with nogil: + error = grpc_call_start_batch( self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops, <cpython.PyObject *>batch_operation_tag, NULL) + return error def start_client_batch(self, operations, tag): # We don't reference this call in the operations tag because diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 62e54a388a..9f4aaffc28 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -15,6 +15,9 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_core_dependencies.py.template`!!! CORE_SOURCE_FILES = [ + 'third_party/address_sorting/address_sorting.c', + 'third_party/address_sorting/address_sorting_posix.c', + 'third_party/address_sorting/address_sorting_windows.c', 'src/core/lib/gpr/alloc.cc', 'src/core/lib/gpr/arena.cc', 'src/core/lib/gpr/atm.cc', @@ -60,10 +63,13 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_stack.cc', 'src/core/lib/channel/channel_stack_builder.cc', + 'src/core/lib/channel/channel_trace.cc', + 'src/core/lib/channel/channel_trace_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/handshaker.cc', 'src/core/lib/channel/handshaker_factory.cc', 'src/core/lib/channel/handshaker_registry.cc', + 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', 'src/core/lib/compression/message_compress.cc', @@ -312,7 +318,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/retry_throttle.cc', - 'src/core/ext/filters/client_channel/status_util.cc', 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_index.cc', 'src/core/ext/filters/client_channel/uri_parser.cc', diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index bbbc3ea360..d38ee517f0 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -38,168 +38,6 @@ "unit._cython.cygrpc_test.InsecureServerInsecureClient", "unit._cython.cygrpc_test.SecureServerSecureClient", "unit._cython.cygrpc_test.TypeSmokeTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyEmptyRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManyLargeRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ManySmallRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoEmptyRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoLargeRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.TwoSmallRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroEmptyRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroLargeRequestsZeroReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsTwoReadZeroSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadManyEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadManyLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadManySmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadTwoEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadTwoLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadTwoSmallResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadZeroEmptyResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadZeroLargeResponsesEarlyOKTest", - "unit._early_ok_test.ZeroSmallRequestsZeroReadZeroSmallResponsesEarlyOKTest", "unit._empty_message_test.EmptyMessageTest", "unit._exit_test.ExitTest", "unit._interceptor_test.InterceptorTest", diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index da1996b1d1..7550cd39ba 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -32,10 +32,12 @@ def handle_unary(request, servicer_context): def handle_stream(request_iterator, servicer_context): + # TODO(issue:#6891) We should be able to remove this loop, + # and replace with return; yield servicer_context.send_initial_metadata([('grpc-internal-encoding-request', 'gzip')]) - return - yield + for request in request_iterator: + yield request class _MethodHandler(grpc.RpcMethodHandler): diff --git a/src/python/grpcio_tests/tests/unit/_early_ok_test.py b/src/python/grpcio_tests/tests/unit/_early_ok_test.py deleted file mode 100644 index 041532c661..0000000000 --- a/src/python/grpcio_tests/tests/unit/_early_ok_test.py +++ /dev/null @@ -1,206 +0,0 @@ -# Copyright 2018 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests servicers sending OK status without having read all requests. - -This is a regression test of https://github.com/grpc/grpc/issues/6891. -""" - -import enum -import unittest - -import six - -import grpc - -from tests.unit import test_common -from tests.unit.framework.common import test_constants - -_RPC_METHOD = '/serffice/Meffod' - - -@enum.unique -class _MessageCount(enum.Enum): - - ZERO = ( - 0, - 'Zero', - ) - TWO = ( - 1, - 'Two', - ) - MANY = ( - test_constants.STREAM_LENGTH, - 'Many', - ) - - -@enum.unique -class _MessageSize(enum.Enum): - EMPTY = ( - 0, - 'Empty', - ) - SMALL = ( - 32, - 'Small', - ) # Smaller than any flow control window. - LARGE = ( - 3 * 1024 * 1024, - 'Large', - ) # Larger than any flow control window. - - -_ZERO_MESSAGE = b'' -_SMALL_MESSAGE = b'\x07' * _MessageSize.SMALL.value[0] -_LARGE_MESSAGE = b'abc' * (_MessageSize.LARGE.value[0] // 3) - - -@enum.unique -class _ReadRequests(enum.Enum): - - ZERO = ( - 0, - 'Zero', - ) - TWO = ( - 2, - 'Two', - ) - - -class _Case(object): - - def __init__(self, request_count, request_size, request_reading, - response_count, response_size): - self.request_count = request_count - self.request_size = request_size - self.request_reading = request_reading - self.response_count = response_count - self.response_size = response_size - - def create_test_case_name(self): - return '{}{}Requests{}Read{}{}ResponsesEarlyOKTest'.format( - self.request_count.value[1], self.request_size.value[1], - self.request_reading.value[1], self.response_count.value[1], - self.response_size.value[1]) - - -def _message(message_size): - if message_size is _MessageSize.EMPTY: - return _ZERO_MESSAGE - elif message_size is _MessageSize.SMALL: - return _SMALL_MESSAGE - elif message_size is _MessageSize.LARGE: - return _LARGE_MESSAGE - - -def _messages_to_send(count, size): - for _ in range(count.value[0]): - yield _message(size) - - -def _draw_requests(case, request_iterator): - for _ in range( - min(case.request_count.value[0], case.request_reading.value[0])): - next(request_iterator) - - -def _draw_responses(case, response_iterator): - for _ in range(case.response_count.value[0]): - next(response_iterator) - - -class _MethodHandler(grpc.RpcMethodHandler): - - def __init__(self, case): - self.request_streaming = True - self.response_streaming = True - self.request_deserializer = None - self.response_serializer = None - self.unary_unary = None - self.unary_stream = None - self.stream_unary = None - self._case = case - - def stream_stream(self, request_iterator, servicer_context): - _draw_requests(self._case, request_iterator) - - for response in _messages_to_send(self._case.response_count, - self._case.response_size): - yield response - - -class _GenericHandler(grpc.GenericRpcHandler): - - def __init__(self, case): - self._case = case - - def service(self, handler_call_details): - return _MethodHandler(self._case) - - -class _EarlyOkTest(unittest.TestCase): - - def setUp(self): - self._server = test_common.test_server() - port = self._server.add_insecure_port('[::]:0') - self._server.add_generic_rpc_handlers((_GenericHandler(self.case),)) - self._server.start() - - self._channel = grpc.insecure_channel('localhost:%d' % port) - self._multi_callable = self._channel.stream_stream(_RPC_METHOD) - - def tearDown(self): - self._server.stop(None) - - def test_early_ok(self): - requests = _messages_to_send(self.case.request_count, - self.case.request_size) - - response_iterator_call = self._multi_callable(requests) - - _draw_responses(self.case, response_iterator_call) - - self.assertIs(grpc.StatusCode.OK, response_iterator_call.code()) - - -def _cases(): - for request_count in _MessageCount: - for request_size in _MessageSize: - for request_reading in _ReadRequests: - for response_count in _MessageCount: - for response_size in _MessageSize: - yield _Case(request_count, request_size, - request_reading, response_count, - response_size) - - -def _test_case_classes(): - for case in _cases(): - yield type(case.create_test_case_name(), (_EarlyOkTest,), { - 'case': case, - '__module__': _EarlyOkTest.__module__, - }) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in _test_case_classes())) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index a6cdf32e5b..ca10bd4dab 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -107,6 +107,9 @@ class _Servicer(object): self._received_client_metadata = context.invocation_metadata() context.send_initial_metadata(_SERVER_INITIAL_METADATA) context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the + # request iterator. + list(request_iterator) if self._abort_call: context.abort(self._code, self._details) else: @@ -124,6 +127,9 @@ class _Servicer(object): self._received_client_metadata = context.invocation_metadata() context.send_initial_metadata(_SERVER_INITIAL_METADATA) context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the + # request iterator. + list(request_iterator) if self._abort_call: context.abort(self._code, self._details) else: diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index 2309eeb733..5908421011 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -117,6 +117,9 @@ def handle_stream_unary(test, request_iterator, servicer_context): validate_client_metadata(test, servicer_context) servicer_context.send_initial_metadata(_INITIAL_METADATA) servicer_context.set_trailing_metadata(_TRAILING_METADATA) + # TODO(issue:#6891) We should be able to remove this loop + for request in request_iterator: + pass return _RESPONSE @@ -124,8 +127,10 @@ def handle_stream_stream(test, request_iterator, servicer_context): validate_client_metadata(test, servicer_context) servicer_context.send_initial_metadata(_INITIAL_METADATA) servicer_context.set_trailing_metadata(_TRAILING_METADATA) - return - yield + # TODO(issue:#6891) We should be able to remove this loop, + # and replace with return; yield + for request in request_iterator: + yield _RESPONSE class _MethodHandler(grpc.RpcMethodHandler): diff --git a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py index e35f8f10d4..df4b129018 100644 --- a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py +++ b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py @@ -77,13 +77,18 @@ def handle_unary_stream(trigger, request, servicer_context): def handle_stream_unary(trigger, request_iterator, servicer_context): trigger.await_trigger() + # TODO(issue:#6891) We should be able to remove this loop + for request in request_iterator: + pass return _RESPONSE def handle_stream_stream(trigger, request_iterator, servicer_context): trigger.await_trigger() - return - yield + # TODO(issue:#6891) We should be able to remove this loop, + # and replace with return; yield + for request in request_iterator: + yield _RESPONSE class _MethodHandler(grpc.RpcMethodHandler): diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index c1a0c56841..e8e87e4f15 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -94,7 +94,6 @@ $CFLAGS << ' -std=c99 ' $CFLAGS << ' -Wall ' $CFLAGS << ' -Wextra ' $CFLAGS << ' -pedantic ' -$CFLAGS << ' -Wno-format ' output = File.join('grpc', 'grpc_c') puts 'Generating Makefile for ' + output diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c index 7fdec2ee8b..4ba6991ef6 100644 --- a/src/ruby/ext/grpc/rb_compression_options.c +++ b/src/ruby/ext/grpc/rb_compression_options.c @@ -186,7 +186,7 @@ void grpc_rb_compression_options_algorithm_name_to_value_internal( error_message_ruby_str = rb_str_new(error_message_str, strlen(error_message_str)); gpr_free(error_message_str); - rb_raise(rb_eNameError, StringValueCStr(error_message_ruby_str)); + rb_raise(rb_eNameError, "%s", StringValueCStr(error_message_ruby_str)); } grpc_slice_unref(name_slice); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 7009bd80f6..a12819e87c 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -68,6 +68,8 @@ grpc_channel_get_info_type grpc_channel_get_info_import; grpc_insecure_channel_create_type grpc_insecure_channel_create_import; grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import; grpc_channel_destroy_type grpc_channel_destroy_import; +grpc_channel_get_trace_type grpc_channel_get_trace_import; +grpc_channel_get_uuid_type grpc_channel_get_uuid_import; grpc_call_cancel_type grpc_call_cancel_import; grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import; grpc_call_ref_type grpc_call_ref_import; @@ -307,6 +309,8 @@ void grpc_rb_load_imports(HMODULE library) { grpc_insecure_channel_create_import = (grpc_insecure_channel_create_type) GetProcAddress(library, "grpc_insecure_channel_create"); grpc_lame_client_channel_create_import = (grpc_lame_client_channel_create_type) GetProcAddress(library, "grpc_lame_client_channel_create"); grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy"); + grpc_channel_get_trace_import = (grpc_channel_get_trace_type) GetProcAddress(library, "grpc_channel_get_trace"); + grpc_channel_get_uuid_import = (grpc_channel_get_uuid_type) GetProcAddress(library, "grpc_channel_get_uuid"); grpc_call_cancel_import = (grpc_call_cancel_type) GetProcAddress(library, "grpc_call_cancel"); grpc_call_cancel_with_status_import = (grpc_call_cancel_with_status_type) GetProcAddress(library, "grpc_call_cancel_with_status"); grpc_call_ref_import = (grpc_call_ref_type) GetProcAddress(library, "grpc_call_ref"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 244976c9c5..089cb8a61a 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -179,6 +179,12 @@ extern grpc_lame_client_channel_create_type grpc_lame_client_channel_create_impo typedef void(*grpc_channel_destroy_type)(grpc_channel* channel); extern grpc_channel_destroy_type grpc_channel_destroy_import; #define grpc_channel_destroy grpc_channel_destroy_import +typedef char*(*grpc_channel_get_trace_type)(grpc_channel* channel); +extern grpc_channel_get_trace_type grpc_channel_get_trace_import; +#define grpc_channel_get_trace grpc_channel_get_trace_import +typedef intptr_t(*grpc_channel_get_uuid_type)(grpc_channel* channel); +extern grpc_channel_get_uuid_type grpc_channel_get_uuid_import; +#define grpc_channel_get_uuid grpc_channel_get_uuid_import typedef grpc_call_error(*grpc_call_cancel_type)(grpc_call* call, void* reserved); extern grpc_call_cancel_type grpc_call_cancel_import; #define grpc_call_cancel grpc_call_cancel_import |