diff options
Diffstat (limited to 'src')
30 files changed, 337 insertions, 200 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a10bfea8b1..33cf56519e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -842,10 +842,11 @@ typedef struct { bool completed_recv_trailing_metadata : 1; // State for callback processing. bool retry_dispatched : 1; - bool recv_initial_metadata_ready_deferred : 1; - bool recv_message_ready_deferred : 1; + subchannel_batch_data* recv_initial_metadata_ready_deferred_batch; grpc_error* recv_initial_metadata_error; + subchannel_batch_data* recv_message_ready_deferred_batch; grpc_error* recv_message_error; + subchannel_batch_data* recv_trailing_metadata_internal_batch; } subchannel_call_retry_state; // Pending batches stored in call data. @@ -994,6 +995,39 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, } } +// Frees cached send_initial_metadata. +static void free_cached_send_initial_metadata(channel_data* chand, + call_data* calld) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: destroying calld->send_initial_metadata", chand, + calld); + } + grpc_metadata_batch_destroy(&calld->send_initial_metadata); +} + +// Frees cached send_message at index idx. +static void free_cached_send_message(channel_data* chand, call_data* calld, + size_t idx) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", + chand, calld, idx); + } + (*calld->send_messages)[idx]->Destroy(); +} + +// Frees cached send_trailing_metadata. +static void free_cached_send_trailing_metadata(channel_data* chand, + call_data* calld) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: destroying calld->send_trailing_metadata", + chand, calld); + } + grpc_metadata_batch_destroy(&calld->send_trailing_metadata); +} + // Frees cached send ops that have already been completed after // committing the call. static void free_cached_send_op_data_after_commit( @@ -1001,19 +1035,13 @@ static void free_cached_send_op_data_after_commit( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (retry_state->completed_send_initial_metadata) { - grpc_metadata_batch_destroy(&calld->send_initial_metadata); + free_cached_send_initial_metadata(chand, calld); } for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR - "]", - chand, calld, i); - } - (*calld->send_messages)[i]->Destroy(); + free_cached_send_message(chand, calld, i); } if (retry_state->completed_send_trailing_metadata) { - grpc_metadata_batch_destroy(&calld->send_trailing_metadata); + free_cached_send_trailing_metadata(chand, calld); } } @@ -1025,20 +1053,14 @@ static void free_cached_send_op_data_for_completed_batch( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (batch_data->batch.send_initial_metadata) { - grpc_metadata_batch_destroy(&calld->send_initial_metadata); + free_cached_send_initial_metadata(chand, calld); } if (batch_data->batch.send_message) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR - "]", - chand, calld, retry_state->completed_send_message_count - 1); - } - (*calld->send_messages)[retry_state->completed_send_message_count - 1] - ->Destroy(); + free_cached_send_message(chand, calld, + retry_state->completed_send_message_count - 1); } if (batch_data->batch.send_trailing_metadata) { - grpc_metadata_batch_destroy(&calld->send_trailing_metadata); + free_cached_send_trailing_metadata(chand, calld); } } @@ -1642,7 +1664,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { "(Trailers-Only)", chand, calld); } - retry_state->recv_initial_metadata_ready_deferred = true; + retry_state->recv_initial_metadata_ready_deferred_batch = batch_data; retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it @@ -1731,7 +1753,7 @@ static void recv_message_ready(void* arg, grpc_error* error) { "message and recv_trailing_metadata pending)", chand, calld); } - retry_state->recv_message_ready_deferred = true; + retry_state->recv_message_ready_deferred_batch = batch_data; retry_state->recv_message_error = GRPC_ERROR_REF(error); if (!retry_state->started_recv_trailing_metadata) { // recv_trailing_metadata not yet started by application; start it @@ -1750,6 +1772,59 @@ static void recv_message_ready(void* arg, grpc_error* error) { } // +// list of closures to execute in call combiner +// + +// Represents a closure that needs to run in the call combiner as part of +// starting or completing a batch. +typedef struct { + grpc_closure* closure; + grpc_error* error; + const char* reason; + bool free_reason = false; +} closure_to_execute; + +static void execute_closures_in_call_combiner(grpc_call_element* elem, + const char* caller, + closure_to_execute* closures, + size_t num_closures) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + // Note that the call combiner will be yielded for each closure that + // we schedule. We're already running in the call combiner, so one of + // the closures can be scheduled directly, but the others will + // have to re-enter the call combiner. + if (num_closures > 0) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: %s starting closure: %s", chand, + calld, caller, closures[0].reason); + } + GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); + if (closures[0].free_reason) { + gpr_free(const_cast<char*>(closures[0].reason)); + } + for (size_t i = 1; i < num_closures; ++i) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: %s starting closure in call combiner: %s", + chand, calld, caller, closures[i].reason); + } + GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, + closures[i].error, closures[i].reason); + if (closures[i].free_reason) { + gpr_free(const_cast<char*>(closures[i].reason)); + } + } + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: no closures to run for %s", chand, + calld, caller); + } + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run"); + } +} + +// // on_complete callback handling // @@ -1777,36 +1852,35 @@ static void update_retry_state_for_completed_batch( } } -// Represents a closure that needs to run as a result of a completed batch. -typedef struct { - grpc_closure* closure; - grpc_error* error; - const char* reason; -} closure_to_execute; - // Adds any necessary closures for deferred recv_initial_metadata and // recv_message callbacks to closures, updating *num_closures as needed. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, closure_to_execute* closures, size_t* num_closures) { - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_initial_metadata_ready_deferred) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, - invoke_recv_initial_metadata_callback, batch_data, - grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_initial_metadata_error; - closure->reason = "resuming recv_initial_metadata_ready"; - } - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_message_ready_deferred) { - closure_to_execute* closure = &closures[(*num_closures)++]; - closure->closure = GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, - invoke_recv_message_callback, - batch_data, grpc_schedule_on_exec_ctx); - closure->error = retry_state->recv_message_error; - closure->reason = "resuming recv_message_ready"; + if (batch_data->batch.recv_trailing_metadata) { + // Add closure for deferred recv_initial_metadata_ready. + if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_initial_metadata_ready, + invoke_recv_initial_metadata_callback, + retry_state->recv_initial_metadata_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_initial_metadata_error; + closure->reason = "resuming recv_initial_metadata_ready"; + retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; + } + // Add closure for deferred recv_message_ready. + if (retry_state->recv_message_ready_deferred_batch != nullptr) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_message_ready, invoke_recv_message_callback, + retry_state->recv_message_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_message_error; + closure->reason = "resuming recv_message_ready"; + retry_state->recv_message_ready_deferred_batch = nullptr; + } } } @@ -1951,6 +2025,8 @@ static void on_complete(void* arg, grpc_error* error) { // If we have previously completed recv_trailing_metadata, then the // call is finished. bool call_finished = retry_state->completed_recv_trailing_metadata; + // Record whether we were already committed before receiving this callback. + const bool previously_committed = calld->retry_committed; // Update bookkeeping in retry_state. update_retry_state_for_completed_batch(batch_data, retry_state); if (call_finished) { @@ -1979,35 +2055,39 @@ static void on_complete(void* arg, grpc_error* error) { if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; } - } else if (retry_state->completed_recv_trailing_metadata) { - call_finished = true; - } - if (call_finished && grpc_client_channel_trace.enabled()) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand, - calld, grpc_status_code_to_string(status)); } - // If the call is finished, check if we should retry. - if (call_finished && - maybe_retry(elem, batch_data, status, server_pushback_md)) { - // Unref batch_data for deferred recv_initial_metadata_ready or - // recv_message_ready callbacks, if any. - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_initial_metadata_ready_deferred) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + // If the call just finished, check if we should retry. + if (call_finished) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand, + calld, grpc_status_code_to_string(status)); } - if (batch_data->batch.recv_trailing_metadata && - retry_state->recv_message_ready_deferred) { + if (maybe_retry(elem, batch_data, status, server_pushback_md)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_initial_metadata_ready_deferred_batch != + nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + } + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_message_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_message_error); + } batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_message_error); + return; } - batch_data_unref(batch_data); - return; + // Not retrying, so commit the call. + retry_commit(elem, retry_state); } } - // If the call is finished or retries are committed, free cached data for - // send ops that we've just completed. - if (call_finished || calld->retry_committed) { + // If we were already committed before receiving this callback, free + // cached data for send ops that we've just completed. (If the call has + // just now finished, the call to retry_commit() above will have freed all + // cached send ops, so we don't need to do it here.) + if (previously_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } // Call not being retried. @@ -2042,20 +2122,8 @@ static void on_complete(void* arg, grpc_error* error) { // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. - // Note that the call combiner will be yielded for each closure that - // we schedule. We're already running in the call combiner, so one of - // the closures can be scheduled directly, but the others will - // have to re-enter the call combiner. - if (num_closures > 0) { - GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); - for (size_t i = 1; i < num_closures; ++i) { - GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, - closures[i].error, closures[i].reason); - } - } else { - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "no closures to run for on_complete"); - } + execute_closures_in_call_combiner(elem, "on_complete", closures, + num_closures); } // @@ -2072,6 +2140,31 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_subchannel_call_process_op(subchannel_call, batch); } +// Adds a closure to closures that will execute batch in the call combiner. +static void add_closure_for_subchannel_batch( + call_data* calld, grpc_transport_stream_op_batch* batch, + closure_to_execute* closures, size_t* num_closures) { + batch->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + start_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch->handler_private.closure; + closure->error = GRPC_ERROR_NONE; + // If the tracer is enabled, we log a more detailed message, which + // requires dynamic allocation. This will be freed in + // start_retriable_subchannel_batches(). + if (grpc_client_channel_trace.enabled()) { + char* batch_str = grpc_transport_stream_op_batch_string(batch); + gpr_asprintf(const_cast<char**>(&closure->reason), + "starting batch in call combiner: %s", batch_str); + gpr_free(batch_str); + closure->free_reason = true; + } else { + closure->reason = "start_subchannel_batch"; + } +} + // Adds retriable send_initial_metadata op to batch_data. static void add_retriable_send_initial_metadata_op( call_data* calld, subchannel_call_retry_state* retry_state, @@ -2227,8 +2320,12 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); - subchannel_batch_data* batch_data = batch_data_create(elem, 1); + // Create batch_data with 2 refs, since this batch will be unreffed twice: + // once when the subchannel batch returns, and again when we actually get + // a recv_trailing_metadata op from the surface. + subchannel_batch_data* batch_data = batch_data_create(elem, 2); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); + retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); } @@ -2299,7 +2396,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_transport_stream_op_batch** batches, size_t* num_batches) { + closure_to_execute* closures, size_t* num_closures) { call_data* calld = static_cast<call_data*>(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; @@ -2342,13 +2439,37 @@ static void add_subchannel_batches_for_pending_batches( } if (batch->recv_trailing_metadata && retry_state->started_recv_trailing_metadata) { + // If we previously completed a recv_trailing_metadata op + // initiated by start_internal_recv_trailing_metadata(), use the + // result of that instead of trying to re-start this op. + if (retry_state->recv_trailing_metadata_internal_batch != nullptr) { + // If the batch completed, then trigger the completion callback + // directly, so that we return the previously returned results to + // the application. Otherwise, just unref the internally + // started subchannel batch, since we'll propagate the + // completion when it completes. + if (retry_state->completed_recv_trailing_metadata) { + subchannel_batch_data* batch_data = + retry_state->recv_trailing_metadata_internal_batch; + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch_data->on_complete; + // Batches containing recv_trailing_metadata always succeed. + closure->error = GRPC_ERROR_NONE; + closure->reason = + "re-executing on_complete for recv_trailing_metadata " + "to propagate internally triggered result"; + } else { + batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); + } + retry_state->recv_trailing_metadata_internal_batch = nullptr; + } continue; } // If we're not retrying, just send the batch as-is. if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { - batches[(*num_batches)++] = batch; + add_closure_for_subchannel_batch(calld, batch, closures, num_closures); pending_batch_clear(calld, pending); continue; } @@ -2385,7 +2506,8 @@ static void add_subchannel_batches_for_pending_batches( GPR_ASSERT(batch->collect_stats); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } - batches[(*num_batches)++] = &batch_data->batch; + add_closure_for_subchannel_batch(calld, &batch_data->batch, closures, + num_closures); } } @@ -2403,62 +2525,29 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { static_cast<subchannel_call_retry_state*>( grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); + // Construct list of closures to execute, one for each pending batch. // We can start up to 6 batches. - grpc_transport_stream_op_batch* - batches[GPR_ARRAY_SIZE(calld->pending_batches)]; - size_t num_batches = 0; + closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_closures = 0; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { - batches[num_batches++] = &replay_batch_data->batch; + add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures, + &num_closures); } // Now add pending batches. - add_subchannel_batches_for_pending_batches(elem, retry_state, batches, - &num_batches); + add_subchannel_batches_for_pending_batches(elem, retry_state, closures, + &num_closures); // Start batches on subchannel call. - // Note that the call combiner will be yielded for each batch that we - // send down. We're already running in the call combiner, so one of - // the batches can be started directly, but the others will have to - // re-enter the call combiner. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, num_batches, calld->subchannel_call); - } - if (num_batches == 0) { - // This should be fairly rare, but it can happen when (e.g.) an - // attempt completes before it has finished replaying all - // previously sent messages. - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "no retriable subchannel batches to start"); - } else { - for (size_t i = 1; i < num_batches; ++i) { - if (grpc_client_channel_trace.enabled()) { - char* batch_str = grpc_transport_stream_op_batch_string(batches[i]); - gpr_log(GPR_DEBUG, - "chand=%p calld=%p: starting batch in call combiner: %s", chand, - calld, batch_str); - gpr_free(batch_str); - } - batches[i]->handler_private.extra_arg = calld->subchannel_call; - GRPC_CLOSURE_INIT(&batches[i]->handler_private.closure, - start_batch_in_call_combiner, batches[i], - grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(calld->call_combiner, - &batches[i]->handler_private.closure, - GRPC_ERROR_NONE, "start_subchannel_batch"); - } - if (grpc_client_channel_trace.enabled()) { - char* batch_str = grpc_transport_stream_op_batch_string(batches[0]); - gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting batch: %s", chand, calld, - batch_str); - gpr_free(batch_str); - } - // Note: This will release the call combiner. - grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); + chand, calld, num_closures, calld->subchannel_call); } + execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches", + closures, num_closures); } // diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index fb29fa788d..4e8b8b71db 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -326,7 +326,7 @@ static void http_connect_handshaker_do_handshake( static const grpc_handshaker_vtable http_connect_handshaker_vtable = { http_connect_handshaker_destroy, http_connect_handshaker_shutdown, - http_connect_handshaker_do_handshake}; + http_connect_handshaker_do_handshake, "http_connect"}; static grpc_handshaker* grpc_http_connect_handshaker_create() { http_connect_handshaker* handshaker = diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index fb2435749d..e86ab5a37e 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -153,7 +153,10 @@ static void grpc_ares_request_unref(grpc_ares_request* r) { /* If there are no pending queries, invoke on_done callback and destroy the request */ if (gpr_unref(&r->pending_queries)) { - grpc_cares_wrapper_address_sorting_sort(*(r->lb_addrs_out)); + grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out); + if (lb_addrs != nullptr) { + grpc_cares_wrapper_address_sorting_sort(lb_addrs); + } GRPC_CLOSURE_SCHED(r->on_done, r->error); gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy(r->ev_driver); diff --git a/src/core/lib/channel/channel_args.cc b/src/core/lib/channel/channel_args.cc index 66a86c2286..e49d532e11 100644 --- a/src/core/lib/channel/channel_args.cc +++ b/src/core/lib/channel/channel_args.cc @@ -411,3 +411,31 @@ grpc_arg grpc_channel_arg_pointer_create( arg.value.pointer.vtable = vtable; return arg; } + +char* grpc_channel_args_string(const grpc_channel_args* args) { + if (args == nullptr) return nullptr; + gpr_strvec v; + gpr_strvec_init(&v); + for (size_t i = 0; i < args->num_args; ++i) { + const grpc_arg& arg = args->args[i]; + char* s; + switch (arg.type) { + case GRPC_ARG_INTEGER: + gpr_asprintf(&s, "%s=%d", arg.key, arg.value.integer); + break; + case GRPC_ARG_STRING: + gpr_asprintf(&s, "%s=%s", arg.key, arg.value.string); + break; + case GRPC_ARG_POINTER: + gpr_asprintf(&s, "%s=%p", arg.key, arg.value.pointer.p); + break; + default: + gpr_asprintf(&s, "arg with unknown type"); + } + gpr_strvec_add(&v, s); + } + char* result = + gpr_strjoin_sep(const_cast<const char**>(v.strs), v.count, ", ", nullptr); + gpr_strvec_destroy(&v); + return result; +} diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index c0d6a17356..5ff303a9dc 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -124,4 +124,8 @@ grpc_arg grpc_channel_arg_integer_create(char* name, int value); grpc_arg grpc_channel_arg_pointer_create(char* name, void* value, const grpc_arg_pointer_vtable* vtable); +// Returns a string representing channel args in human-readable form. +// Callers takes ownership of result. +char* grpc_channel_args_string(const grpc_channel_args* args); + #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index 9b1af8d6cb..9cd97823d4 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -22,11 +22,15 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/timer.h" +grpc_core::TraceFlag grpc_handshaker_trace(false, "handshaker"); + // // grpc_handshaker // @@ -52,6 +56,10 @@ void grpc_handshaker_do_handshake(grpc_handshaker* handshaker, args); } +const char* grpc_handshaker_name(grpc_handshaker* handshaker) { + return handshaker->vtable->name; +} + // // grpc_handshake_manager // @@ -127,6 +135,12 @@ static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; } void grpc_handshake_manager_add(grpc_handshake_manager* mgr, grpc_handshaker* handshaker) { + if (grpc_handshaker_trace.enabled()) { + gpr_log( + GPR_DEBUG, + "handshake_manager %p: adding handshaker %s [%p] at index %" PRIuPTR, + mgr, grpc_handshaker_name(handshaker), handshaker, mgr->count); + } gpr_mu_lock(&mgr->mu); // To avoid allocating memory for each handshaker we add, we double // the number of elements every time we need more. @@ -172,23 +186,56 @@ void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr, GRPC_ERROR_UNREF(why); } +static char* handshaker_args_string(grpc_handshaker_args* args) { + char* args_str = grpc_channel_args_string(args->args); + size_t num_args = args->args != nullptr ? args->args->num_args : 0; + size_t read_buffer_length = + args->read_buffer != nullptr ? args->read_buffer->length : 0; + char* str; + gpr_asprintf(&str, + "{endpoint=%p, args=%p {size=%" PRIuPTR + ": %s}, read_buffer=%p (length=%" PRIuPTR "), exit_early=%d}", + args->endpoint, args->args, num_args, args_str, + args->read_buffer, read_buffer_length, args->exit_early); + gpr_free(args_str); + return str; +} + // Helper function to call either the next handshaker or the // on_handshake_done callback. // Returns true if we've scheduled the on_handshake_done callback. static bool call_next_handshaker_locked(grpc_handshake_manager* mgr, grpc_error* error) { + if (grpc_handshaker_trace.enabled()) { + char* args_str = handshaker_args_string(&mgr->args); + gpr_log(GPR_DEBUG, + "handshake_manager %p: error=%s shutdown=%d index=%" PRIuPTR + ", args=%s", + mgr, grpc_error_string(error), mgr->shutdown, mgr->index, args_str); + gpr_free(args_str); + } GPR_ASSERT(mgr->index <= mgr->count); // If we got an error or we've been shut down or we're exiting early or // we've finished the last handshaker, invoke the on_handshake_done // callback. Otherwise, call the next handshaker. if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early || mgr->index == mgr->count) { + if (grpc_handshaker_trace.enabled()) { + gpr_log(GPR_DEBUG, "handshake_manager %p: handshaking complete", mgr); + } // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. grpc_timer_cancel(&mgr->deadline_timer); GRPC_CLOSURE_SCHED(&mgr->on_handshake_done, error); mgr->shutdown = true; } else { + if (grpc_handshaker_trace.enabled()) { + gpr_log( + GPR_DEBUG, + "handshake_manager %p: calling handshaker %s [%p] at index %" PRIuPTR, + mgr, grpc_handshaker_name(mgr->handshakers[mgr->index]), + mgr->handshakers[mgr->index], mgr->index); + } grpc_handshaker_do_handshake(mgr->handshakers[mgr->index], mgr->acceptor, &mgr->call_next_handshaker, &mgr->args); } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index dfecd81004..be7fd127e4 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -84,6 +84,9 @@ typedef struct { grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args); + + /// The name of the handshaker, for debugging purposes. + const char* name; } grpc_handshaker_vtable; /// Base struct. To subclass, make this the first member of the @@ -102,6 +105,7 @@ void grpc_handshaker_do_handshake(grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args); +const char* grpc_handshaker_name(grpc_handshaker* handshaker); /// /// grpc_handshake_manager diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 0c97dfa6b3..57dd3406bc 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -406,7 +406,7 @@ static void security_handshaker_do_handshake(grpc_handshaker* handshaker, static const grpc_handshaker_vtable security_handshaker_vtable = { security_handshaker_destroy, security_handshaker_shutdown, - security_handshaker_do_handshake}; + security_handshaker_do_handshake, "security"}; static grpc_handshaker* security_handshaker_create( tsi_handshaker* handshaker, grpc_security_connector* connector) { @@ -456,7 +456,7 @@ static void fail_handshaker_do_handshake(grpc_handshaker* handshaker, static const grpc_handshaker_vtable fail_handshaker_vtable = { fail_handshaker_destroy, fail_handshaker_shutdown, - fail_handshaker_do_handshake}; + fail_handshaker_do_handshake, "security_fail"}; static grpc_handshaker* fail_handshaker_create() { grpc_handshaker* h = static_cast<grpc_handshaker*>(gpr_malloc(sizeof(*h))); diff --git a/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include b/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include index f1b85c3730..309e33d47e 100644 --- a/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include +++ b/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include @@ -1,13 +1,5 @@ <Project> <ItemGroup> - <!-- We are relying on run_tests.py to build grpc_csharp_ext with the right bitness - and we copy it as both x86 (needed by net45) and x64 (needed by netcoreapp1.0) as we don't - know which one will be needed to run the tests. --> - <Content Include="..\..\..\libs\$(NativeDependenciesConfigurationUnix)\libgrpc_csharp_ext.dylib"> - <Link>libgrpc_csharp_ext.x86.dylib</Link> - <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> - <Pack>false</Pack> - </Content> <Content Include="..\..\..\libs\$(NativeDependenciesConfigurationUnix)\libgrpc_csharp_ext.dylib"> <Link>libgrpc_csharp_ext.x64.dylib</Link> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include index f7a7a5cbe9..6e28c11df2 100755 --- a/src/csharp/Grpc.Core/Version.csproj.include +++ b/src/csharp/Grpc.Core/Version.csproj.include @@ -2,6 +2,6 @@ <Project> <PropertyGroup> <GrpcCsharpVersion>1.12.0-dev</GrpcCsharpVersion> - <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion> + <GoogleProtobufVersion>3.5.1</GoogleProtobufVersion> </PropertyGroup> </Project> diff --git a/src/csharp/global.json b/src/csharp/global.json deleted file mode 100644 index 815be4bfb9..0000000000 --- a/src/csharp/global.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "sdk": { - "version": "2.1.4" - } -} diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index 283306262a..bfb1fd352c 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -108,7 +108,10 @@ static NSMutableDictionary *kHostCache; serverName:(NSString *)serverName timeout:(NSTimeInterval)timeout completionQueue:(GRPCCompletionQueue *)queue { - GRPCChannel *channel; + // The __block attribute is to allow channel take refcount inside @synchronized block. Without + // this attribute, retain of channel object happens after objc_sync_exit in release builds, which + // may result in channel released before used. See grpc/#15033. + __block GRPCChannel *channel; // This is racing -[GRPCHost disconnect]. @synchronized(self) { if (!_channel) { diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 60d309ec65..35c09827ba 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -57,7 +57,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index 10c4c38f19..589d0ff556 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -58,7 +58,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py index 5a9d593ec1..eb480a5464 100644 --- a/src/python/grpcio_testing/setup.py +++ b/src/python/grpcio_testing/setup.py @@ -29,7 +29,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 4e0f6726fa..98ac19d188 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -41,7 +41,7 @@ INSTALL_REQUIRES = ( 'grpcio>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), - 'oauth2client>=1.4.7', 'protobuf>=3.5.0.post1', 'six>=1.10', + 'oauth2client>=1.4.7', 'protobuf>=3.5.2.post1', 'six>=1.10', 'google-auth>=1.0.0', 'requests>=2.14.2') COMMAND_CLASS = { diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py index 31680916b4..be0af64646 100644 --- a/src/python/grpcio_tests/tests/_loader.py +++ b/src/python/grpcio_tests/tests/_loader.py @@ -54,7 +54,7 @@ class Loader(object): for module in modules: try: package_paths = module.__path__ - except: + except AttributeError: continue self.walk_packages(package_paths) coverage_context.stop() diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py index 9907c4e1f9..b105f18e78 100644 --- a/src/python/grpcio_tests/tests/_result.py +++ b/src/python/grpcio_tests/tests/_result.py @@ -46,7 +46,7 @@ class CaseResult( None. """ - class Kind: + class Kind(object): UNTESTED = 'untested' RUNNING = 'running' ERROR = 'error' @@ -257,7 +257,7 @@ class CoverageResult(AugmentedResult): #coverage.Coverage().combine() -class _Colors: +class _Colors(object): """Namespaced constants for terminal color magic numbers.""" HEADER = '\033[95m' INFO = '\033[94m' diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 3780ed9020..698c37017f 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -66,10 +66,6 @@ def _args(): return parser.parse_args() -def _application_default_credentials(): - return oauth2client_client.GoogleCredentials.get_application_default() - - def _stub(args): target = '{}:{}'.format(args.server_host, args.server_port) if args.test_case == 'oauth2_auth_token': diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py index ad0ecf0079..b46e53315e 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py @@ -329,9 +329,7 @@ class PythonPluginTest(unittest.TestCase): _packagify(self._python_out) - with _system_path([ - self._python_out, - ]): + with _system_path([self._python_out]): self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2) self._requests_pb2 = importlib.import_module(_REQUESTS_PB2) self._responses_pb2 = importlib.import_module(_RESPONSES_PB2) diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py index d5038e3ba2..764cda17fb 100644 --- a/src/python/grpcio_tests/tests/stress/test_runner.py +++ b/src/python/grpcio_tests/tests/stress/test_runner.py @@ -50,7 +50,7 @@ class TestRunner(threading.Thread): test_case.test_interoperability(self._stub, None) end_time = time.time() self._histogram.add((end_time - start_time) * 1e9) - except Exception as e: + except Exception as e: # pylint: disable=broad-except traceback.print_exc() self._exception_queue.put( Exception("An exception occured during test {}" diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py index 7d0d74c8c4..3ddeba2373 100644 --- a/src/python/grpcio_tests/tests/testing/_client_application.py +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -215,30 +215,6 @@ def _run_infinite_request_stream(stub): return _UNSATISFACTORY_OUTCOME -def run(scenario, channel): - stub = services_pb2_grpc.FirstServiceStub(channel) - try: - if scenario is Scenario.UNARY_UNARY: - return _run_unary_unary(stub) - elif scenario is Scenario.UNARY_STREAM: - return _run_unary_stream(stub) - elif scenario is Scenario.STREAM_UNARY: - return _run_stream_unary(stub) - elif scenario is Scenario.STREAM_STREAM: - return _run_stream_stream(stub) - elif scenario is Scenario.CONCURRENT_STREAM_UNARY: - return _run_concurrent_stream_unary(stub) - elif scenario is Scenario.CONCURRENT_STREAM_STREAM: - return _run_concurrent_stream_stream(stub) - elif scenario is Scenario.CANCEL_UNARY_UNARY: - return _run_cancel_unary_unary(stub) - elif scenario is Scenario.INFINITE_REQUEST_STREAM: - return _run_infinite_request_stream(stub) - except grpc.RpcError as rpc_error: - return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(), - rpc_error.details()) - - _IMPLEMENTATIONS = { Scenario.UNARY_UNARY: _run_unary_unary, Scenario.UNARY_STREAM: _run_unary_stream, diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py index 02769ca68d..243c385daf 100644 --- a/src/python/grpcio_tests/tests/testing/_server_application.py +++ b/src/python/grpcio_tests/tests/testing/_server_application.py @@ -38,7 +38,7 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_details('Something is wrong with your request!') return - yield services_pb2.Strange() + yield services_pb2.Strange() # pylint: disable=unreachable def StreUn(self, request_iterator, context): context.send_initial_metadata((( diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 7550cd39ba..0b11f03adf 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -52,9 +52,9 @@ class _MethodHandler(grpc.RpcMethodHandler): self.stream_unary = None self.stream_stream = None if self.request_streaming and self.response_streaming: - self.stream_stream = lambda x, y: handle_stream(x, y) + self.stream_stream = handle_stream elif not self.request_streaming and not self.response_streaming: - self.unary_unary = lambda x, y: handle_unary(x, y) + self.unary_unary = handle_unary class _GenericHandler(grpc.GenericRpcHandler): diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 9045ff58a0..23f5ef605d 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -285,7 +285,7 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type(), found_server_op_types) + self.assertNotIn(server_result.type(), found_server_op_types) found_server_op_types.add(server_result.type()) if server_result.type() == cygrpc.OperationType.receive_message: self.assertEqual(REQUEST, server_result.message()) diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py index 4a00b9ef2f..7d5eaaaa84 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py @@ -25,7 +25,7 @@ class SimpleFuture(object): def wrapped_function(): try: self._result = function(*args, **kwargs) - except Exception as error: + except Exception as error: # pylint: disable=broad-except self._error = error self._result = None @@ -41,7 +41,7 @@ class SimpleFuture(object): self._thread.join() if self._error: # TODO(atash): re-raise exceptions in a way that preserves tracebacks - raise self._error + raise self._error # pylint: disable=raising-bad-type return self._result diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py index 6e6d9de0fb..f40f3ae07c 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_test.py +++ b/src/python/grpcio_tests/tests/unit/_exit_test.py @@ -49,7 +49,7 @@ def cleanup_processes(): for process in processes: try: process.kill() - except Exception: + except Exception: # pylint: disable=broad-except pass diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py index e683131722..ad847ae03e 100644 --- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py +++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py @@ -14,7 +14,7 @@ _BEFORE_IMPORT = tuple(globals()) -from grpc import * +from grpc import * # pylint: disable=wildcard-import _AFTER_IMPORT = tuple(globals()) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index e40cca8b24..93a5fdf9ff 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -165,11 +165,13 @@ class FailAfterFewIterationsCounter(object): def __next__(self): if self._current >= self._high: - raise Exception("This is a deliberate failure in a unit test.") + raise test_control.Defect() else: self._current += 1 return self._bytestring + next = __next__ + def _unary_unary_multi_callable(channel): return channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py index 61c03f64ba..b43c647fc9 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py @@ -65,7 +65,7 @@ class _Servicer(object): self._serviced = True self._condition.notify_all() return - yield + yield # pylint: disable=unreachable def stream_unary(self, request_iterator, context): for request in request_iterator: |