From 87daf00f437b2bc9fb3c2ab662e7f7105e3dfccb Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Fri, 8 Jun 2018 13:50:32 -0700 Subject: Simplify call error/status aggregation --- .../filters/message_size/message_size_filter.cc | 38 ++- src/core/lib/surface/call.cc | 341 ++++++--------------- 2 files changed, 135 insertions(+), 244 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e62..3cd569b5da 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,10 +99,15 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; + grpc_closure recv_trailing_metadata; + // The error caused by a message that is too large, or GRPC_ERROR_NONE + grpc_error* error; // Used by recv_message_ready. grpc_core::OrphanablePtr* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; + // Original recv_trailing_metadata callback, invoked after our own. + grpc_closure* next_recv_trailing_metadata; }; struct channel_data { @@ -130,6 +135,8 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_ERROR_UNREF(calld->error); + calld->error = GRPC_ERROR_REF(new_error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { @@ -144,6 +151,23 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); } +// Callback invoked on completion of recv_trailing_metadata +// Notifies the recv_trailing_metadata batch of any message size failures +static void recv_trailing_metadata(void* user_data, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; + if (error == GRPC_ERROR_NONE) { + error = calld->error; + } else if (calld->error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(error); + } else if (calld->error != error) { + error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error); + } + calld->error = GRPC_ERROR_NONE; + // Invoke the next callback. + GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata, error); +} + // Start transport stream op. static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { @@ -172,6 +196,11 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + // Inject callback for receiving trailing metadata. + if (op->recv_trailing_metadata) { + calld->next_recv_trailing_metadata = op->on_complete; + op->on_complete = &calld->recv_trailing_metadata; + } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -183,8 +212,12 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; + calld->next_recv_trailing_metadata = nullptr; + calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata, recv_trailing_metadata, + elem, grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -213,7 +246,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; + GRPC_ERROR_UNREF(calld->error); +} static int default_size(const grpc_channel_args* args, int without_minimal_stack) { diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 1cf8ea94e7..e012244b0f 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -70,46 +70,6 @@ // Used to create arena for the first call. #define ESTIMATED_MDELEM_COUNT 16 -/* Status data for a request can come from several sources; this - enumerates them all, and acts as a priority sorting for which - status to return to the application - earlier entries override - later ones */ -typedef enum { - /* Status came from the application layer overriding whatever - the wire says */ - STATUS_FROM_API_OVERRIDE = 0, - /* Status came from 'the wire' - or somewhere below the surface - layer */ - STATUS_FROM_WIRE, - /* Status was created by some internal channel stack operation: must come via - add_batch_error */ - STATUS_FROM_CORE, - /* Status was created by some surface error */ - STATUS_FROM_SURFACE, - /* Status came from the server sending status */ - STATUS_FROM_SERVER_STATUS, - STATUS_SOURCE_COUNT -} status_source; - -typedef struct { - bool is_set; - grpc_error* error; -} received_status; - -static gpr_atm pack_received_status(received_status r) { - return r.is_set ? (1 | (gpr_atm)r.error) : 0; -} - -static received_status unpack_received_status(gpr_atm atm) { - if ((atm & 1) == 0) { - return {false, GRPC_ERROR_NONE}; - } else { - return {true, (grpc_error*)(atm & ~static_cast(1))}; - } -} - -#define MAX_ERRORS_PER_BATCH 4 - typedef struct batch_control { grpc_call* call; /* Share memory for cq_completion and notify_tag as they are never needed @@ -134,10 +94,7 @@ typedef struct batch_control { grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; - - grpc_error* errors[MAX_ERRORS_PER_BATCH]; - gpr_atm num_errors; - + grpc_error* batch_error; grpc_transport_stream_op_batch op; } batch_control; @@ -200,9 +157,6 @@ struct grpc_call { // A char* indicating the peer name. gpr_atm peer_string; - /* Packed received call statuses from various sources */ - gpr_atm status[STATUS_SOURCE_COUNT]; - /* Call data useful used for reporting. Only valid after the call has * completed */ grpc_call_final_info final_info; @@ -234,6 +188,7 @@ struct grpc_call { grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; + bool cancelled; grpc_closure release_call; @@ -247,6 +202,7 @@ struct grpc_call { int* cancelled; } server; } final_op; + grpc_error* status_error; /* recv_state can contain one of the following values: RECV_NONE : : no initial metadata and messages received @@ -279,23 +235,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression"); static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op, grpc_closure* start_batch_closure); -static void cancel_with_status(grpc_call* c, status_source source, - grpc_status_code status, + +static void cancel_with_status(grpc_call* c, grpc_status_code status, const char* description); -static void cancel_with_error(grpc_call* c, status_source source, - grpc_error* error); +static void cancel_with_error(grpc_call* c, grpc_error* error); static void destroy_call(void* call_stack, grpc_error* error); static void receiving_slice_ready(void* bctlp, grpc_error* error); -static void get_final_status( - grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string); -static void set_status_value_directly(grpc_status_code status, void* dest); -static void set_status_from_error(grpc_call* call, status_source source, - grpc_error* error); +static void set_final_status(grpc_call* call, grpc_error* error); static void process_data_after_md(batch_control* bctl); static void post_batch_completion(batch_control* bctl); -static void add_batch_error(batch_control* bctl, grpc_error* error, - bool has_cancelled); static void add_init_error(grpc_error** composite, grpc_error* new_err) { if (new_err == GRPC_ERROR_NONE) return; @@ -456,10 +404,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_mu_unlock(&pc->child_list_mu); } if (error != GRPC_ERROR_NONE) { - cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); + cancel_with_error(call, GRPC_ERROR_REF(error)); } if (immediately_cancel) { - cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + cancel_with_error(call, GRPC_ERROR_CANCELLED); } if (args->cq != nullptr) { GPR_ASSERT(args->pollset_set_alternative == nullptr && @@ -520,7 +468,6 @@ static void release_call(void* call, grpc_error* error) { GRPC_CHANNEL_INTERNAL_UNREF(channel, "call"); } -static void set_status_value_directly(grpc_status_code status, void* dest); static void destroy_call(void* call, grpc_error* error) { GPR_TIMER_SCOPE("destroy_call", 0); size_t i; @@ -547,16 +494,14 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - get_final_status(c, set_status_value_directly, &c->final_info.final_status, - nullptr, c->final_info.error_string); + grpc_slice slice = grpc_empty_slice(); + grpc_error_get_status(c->status_error, c->send_deadline, + &c->final_info.final_status, &slice, nullptr, + c->final_info.error_string); + GRPC_ERROR_UNREF(c->status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - GRPC_ERROR_UNREF( - unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); - } - grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info, GRPC_CLOSURE_INIT(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); @@ -594,7 +539,7 @@ void grpc_call_unref(grpc_call* c) { bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && gpr_atm_acq_load(&c->received_final_op_atm) == 0; if (cancel) { - cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + cancel_with_error(c, GRPC_ERROR_CANCELLED); } else { // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if @@ -609,8 +554,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); grpc_core::ExecCtx exec_ctx; - cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); - + cancel_with_error(call, GRPC_ERROR_CANCELLED); return GRPC_CALL_OK; } @@ -664,8 +608,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == nullptr); - cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description); - + cancel_with_status(c, status, description); return GRPC_CALL_OK; } @@ -685,15 +628,18 @@ static void done_termination(void* arg, grpc_error* error) { gpr_free(state); } -static void cancel_with_error(grpc_call* c, status_source source, - grpc_error* error) { +static void cancel_with_error(grpc_call* c, grpc_error* error) { + if (c->cancelled) { + GRPC_ERROR_UNREF(error); + return; + } + c->cancelled = true; GRPC_CALL_INTERNAL_REF(c, "termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent // down the filter stack in a timely manner. grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error)); - set_status_from_error(c, source, GRPC_ERROR_REF(error)); cancel_state* state = static_cast(gpr_malloc(sizeof(*state))); state->call = c; GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, @@ -716,90 +662,28 @@ static grpc_error* error_from_status(grpc_status_code status, GRPC_ERROR_INT_GRPC_STATUS, status); } -static void cancel_with_status(grpc_call* c, status_source source, - grpc_status_code status, +static void cancel_with_status(grpc_call* c, grpc_status_code status, const char* description) { - cancel_with_error(c, source, error_from_status(status, description)); + cancel_with_error(c, error_from_status(status, description)); } -/******************************************************************************* - * FINAL STATUS CODE MANIPULATION - */ - -static bool get_final_status_from( - grpc_call* call, grpc_error* error, bool allow_ok_status, - void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string) { - grpc_status_code code; - grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr, - error_string); - if (code == GRPC_STATUS_OK && !allow_ok_status) { - return false; - } - - set_value(code, set_value_user_data); - if (details != nullptr) { - *details = grpc_slice_ref_internal(slice); - } - return true; -} - -static void get_final_status( - grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data), - void* set_value_user_data, grpc_slice* details, const char** error_string) { - int i; - received_status status[STATUS_SOURCE_COUNT]; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); - } +static void set_final_status(grpc_call* call, grpc_error* error) { if (grpc_call_error_trace.enabled()) { - gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR"); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set) { - gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error)); - } - } + gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); + gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } - /* first search through ignoring "OK" statuses: if something went wrong, - * ensure we report it */ - for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) { - /* search for the best status we can present: ideally the error we use has a - clearly defined grpc-status, and we'll prefer that. */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set && - grpc_error_has_clear_grpc_status(status[i].error)) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details, - error_string)) { - return; - } - } - } - /* If no clearly defined status exists, search for 'anything' */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (status[i].is_set) { - if (get_final_status_from(call, status[i].error, allow_ok_status != 0, - set_value, set_value_user_data, details, - error_string)) { - return; - } - } - } - } - /* If nothing exists, set some default */ if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); + const char** error_string = call->final_op.client.error_string; + grpc_status_code code; + grpc_slice slice = grpc_empty_slice(); + grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr, + error_string); + *call->final_op.client.status = code; + *call->final_op.client.status_details = grpc_slice_ref_internal(slice); + call->status_error = error; } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - -static void set_status_from_error(grpc_call* call, status_source source, - grpc_error* error) { - if (!gpr_atm_rel_cas(&call->status[source], - pack_received_status({false, GRPC_ERROR_NONE}), - pack_received_status({true, error}))) { + *call->final_op.server.cancelled = + error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; GRPC_ERROR_UNREF(error); } } @@ -1018,6 +902,7 @@ static grpc_stream_compression_algorithm decode_stream_compression( static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b, int is_trailing) { if (b->list.count == 0) return; + if (!call->is_client && is_trailing) return; if (is_trailing && call->buffered_metadata[1] == nullptr) return; GPR_TIMER_SCOPE("publish_app_metadata", 0); grpc_metadata_array* dest; @@ -1071,9 +956,12 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) { publish_app_metadata(call, b, false); } -static void recv_trailing_filter(void* args, grpc_metadata_batch* b) { +static void recv_trailing_filter(void* args, grpc_metadata_batch* b, + grpc_error* batch_error) { grpc_call* call = static_cast(args); - if (b->idx.named.grpc_status != nullptr) { + if (batch_error != GRPC_ERROR_NONE) { + set_final_status(call, GRPC_ERROR_REF(batch_error)); + } else if (b->idx.named.grpc_status != nullptr) { grpc_status_code status_code = grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md); grpc_error* error = @@ -1092,10 +980,16 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) { error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, grpc_empty_slice()); } - set_status_from_error(call, STATUS_FROM_WIRE, error); + set_final_status(call, GRPC_ERROR_REF(error)); grpc_metadata_batch_remove(b, b->idx.named.grpc_status); + GRPC_ERROR_UNREF(error); + } else { + // TODO(kpayson) batch completed successfully w/no error + no status, should + // we assert instead? + set_final_status(call, GRPC_ERROR_NONE); } publish_app_metadata(call, b, true); + GRPC_ERROR_UNREF(batch_error); } grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) { @@ -1106,14 +1000,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) { * BATCH API IMPLEMENTATION */ -static void set_status_value_directly(grpc_status_code status, void* dest) { - *static_cast(dest) = status; -} - -static void set_cancelled_value(grpc_status_code status, void* dest) { - *static_cast(dest) = (status != GRPC_STATUS_OK); -} - static bool are_write_flags_valid(uint32_t flags) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ const uint32_t allowed_write_positions = @@ -1181,31 +1067,15 @@ static void finish_batch_completion(void* user_data, GRPC_CALL_INTERNAL_UNREF(call, "completion"); } -static grpc_error* consolidate_batch_errors(batch_control* bctl) { - size_t n = static_cast(gpr_atm_acq_load(&bctl->num_errors)); - if (n == 0) { - return GRPC_ERROR_NONE; - } else if (n == 1) { - /* Skip creating a composite error in the case that only one error was - logged */ - grpc_error* e = bctl->errors[0]; - bctl->errors[0] = nullptr; - return e; - } else { - grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Call batch failed", bctl->errors, n); - for (size_t i = 0; i < n; i++) { - GRPC_ERROR_UNREF(bctl->errors[i]); - bctl->errors[i] = nullptr; - } - return error; - } +static void reset_batch_errors(batch_control* bctl) { + GRPC_ERROR_UNREF(bctl->batch_error); + bctl->batch_error = GRPC_ERROR_NONE; } static void post_batch_completion(batch_control* bctl) { grpc_call* next_child_call; grpc_call* call = bctl->call; - grpc_error* error = consolidate_batch_errors(bctl); + grpc_error* error = GRPC_ERROR_REF(bctl->batch_error); if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( @@ -1223,7 +1093,7 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->op.recv_trailing_metadata) { grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); + recv_trailing_filter(call, md, GRPC_ERROR_REF(bctl->batch_error)); /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); @@ -1237,8 +1107,7 @@ static void post_batch_completion(batch_control* bctl) { next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); - cancel_with_error(child, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); + cancel_with_error(child, GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel"); } child = next_child_call; @@ -1247,16 +1116,6 @@ static void post_batch_completion(batch_control* bctl) { gpr_mu_unlock(&pc->child_list_mu); } - if (call->is_client) { - get_final_status(call, set_status_value_directly, - call->final_op.client.status, - call->final_op.client.status_details, - call->final_op.client.error_string); - } else { - get_final_status(call, set_cancelled_value, - call->final_op.server.cancelled, nullptr, nullptr); - } - GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1265,9 +1124,10 @@ static void post_batch_completion(batch_control* bctl) { grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = nullptr; } + reset_batch_errors(bctl); if (bctl->completion_data.notify_tag.is_closure) { - /* unrefs bctl->error */ + /* unrefs error */ bctl->call = nullptr; /* This closure may be meant to be run within some combiner. Since we aren't * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead @@ -1277,7 +1137,7 @@ static void post_batch_completion(batch_control* bctl) { error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { - /* unrefs bctl->error */ + /* unrefs error */ grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error, finish_batch_completion, bctl, &bctl->completion_data.cq_completion); @@ -1386,8 +1246,10 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) { grpc_call* call = bctl->call; if (error != GRPC_ERROR_NONE) { call->receiving_stream.reset(); - add_batch_error(bctl, GRPC_ERROR_REF(error), true); - cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); + if (bctl->batch_error == GRPC_ERROR_NONE) { + bctl->batch_error = GRPC_ERROR_REF(error); + } + cancel_with_error(call, GRPC_ERROR_REF(error)); } /* If recv_state is RECV_NONE, we will save the batch_control * object with rel_cas, and will not use it after the cas. Its corresponding @@ -1423,8 +1285,7 @@ static void validate_filtered_metadata(batch_control* bctl) { call->incoming_stream_compression_algorithm, call->incoming_message_compression_algorithm); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL, - error_msg); + cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg); gpr_free(error_msg); } else if ( grpc_compression_algorithm_from_message_stream_compression_algorithm( @@ -1436,8 +1297,7 @@ static void validate_filtered_metadata(batch_control* bctl) { "compression (%d).", call->incoming_stream_compression_algorithm, call->incoming_message_compression_algorithm); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL, - error_msg); + cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg); gpr_free(error_msg); } else { char* error_msg = nullptr; @@ -1447,8 +1307,7 @@ static void validate_filtered_metadata(batch_control* bctl) { gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", compression_algorithm); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, - error_msg); + cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, compression_algorithm) == 0) { /* check if algorithm is supported by current channel config */ @@ -1457,8 +1316,7 @@ static void validate_filtered_metadata(batch_control* bctl) { gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, - error_msg); + cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } gpr_free(error_msg); @@ -1476,23 +1334,12 @@ static void validate_filtered_metadata(batch_control* bctl) { } } -static void add_batch_error(batch_control* bctl, grpc_error* error, - bool has_cancelled) { - if (error == GRPC_ERROR_NONE) return; - int idx = static_cast(gpr_atm_full_fetch_add(&bctl->num_errors, 1)); - if (idx == 0 && !has_cancelled) { - cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); - } - bctl->errors[idx] = error; -} - static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1505,6 +1352,11 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) { call->send_deadline = md->deadline; } + } else { + if (bctl->batch_error == GRPC_ERROR_NONE) { + bctl->batch_error = GRPC_ERROR_REF(error); + } + cancel_with_error(call, GRPC_ERROR_REF(error)); } grpc_closure* saved_rsr_closure = nullptr; @@ -1542,7 +1394,12 @@ static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); + if (bctl->batch_error == GRPC_ERROR_NONE) { + bctl->batch_error = GRPC_ERROR_REF(error); + } + if (error != GRPC_ERROR_NONE) { + cancel_with_error(call, GRPC_ERROR_REF(error)); + } finish_batch_step(bctl); } @@ -1740,28 +1597,26 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( call->channel, op->data.send_status_from_server.status); - { - grpc_error* override_error = GRPC_ERROR_NONE; - if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { - override_error = - error_from_status(op->data.send_status_from_server.status, - "Returned non-ok status"); - } - if (op->data.send_status_from_server.status_details != nullptr) { - call->send_extra_metadata[1].md = grpc_mdelem_from_slices( - GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_ref_internal( - *op->data.send_status_from_server.status_details)); - call->send_extra_metadata_count++; - char* msg = grpc_slice_to_c_string( - GRPC_MDVALUE(call->send_extra_metadata[1].md)); - override_error = - grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg)); - gpr_free(msg); - } - set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error); + if (op->data.send_status_from_server.status_details != nullptr) { + call->send_extra_metadata[1].md = grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_ref_internal( + *op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + char* msg = grpc_slice_to_c_string( + GRPC_MDVALUE(call->send_extra_metadata[1].md)); + gpr_free(msg); } + grpc_error* status_error = + op->data.send_status_from_server.status == GRPC_STATUS_OK + ? GRPC_ERROR_NONE + : grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Server returned error"), + GRPC_ERROR_INT_GRPC_STATUS, + static_cast( + op->data.send_status_from_server.status)); + call->status_error = status_error; if (!prepare_application_metadata( call, static_cast( -- cgit v1.2.3 From e670c3754c56f3d2cd1612df95953508e299a5a1 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 19 Jun 2018 23:16:42 -0700 Subject: Minor bug in timer code during shutdown --- src/core/lib/iomgr/timer_generic.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 4294162af7..70417d69b6 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -291,7 +291,7 @@ static void timer_list_init() { static void timer_list_shutdown() { size_t i; run_some_expired_timers( - GPR_ATM_MAX, nullptr, + GRPC_MILLIS_INF_FUTURE, nullptr, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < g_num_shards; i++) { timer_shard* shard = &g_shards[i]; @@ -714,7 +714,7 @@ static grpc_timer_check_result timer_check(grpc_millis* next) { #if GPR_ARCH_64 gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64 - " glob_min=%" PRIdPTR, + " glob_min=%" PRId64, now, next_str, min_timer, gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer))); #else -- cgit v1.2.3 From dd0995d0159169d23f92eaf1df228033c337866c Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Thu, 21 Jun 2018 17:22:09 -0700 Subject: Changes --- .../ext/filters/http/client/http_client_filter.cc | 14 +++++++- .../ext/filters/http/server/http_server_filter.cc | 30 +++++++++++++++++ .../filters/message_size/message_size_filter.cc | 38 ++++++++++++---------- src/core/lib/surface/call.cc | 16 +++++++++ 4 files changed, 79 insertions(+), 19 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 1678051beb..eadb9db1be 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -51,6 +51,7 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; + grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. @@ -147,6 +148,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { call_data* calld = static_cast(elem->call_data); if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } @@ -162,6 +164,13 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } + if (calld->recv_initial_metadata_error != GRPC_ERROR_NONE) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(calld->recv_initial_metadata_error); + } else { + error = grpc_error_add_child(error, calld->recv_initial_metadata_error); + } + } GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } @@ -434,7 +443,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); +} static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { unsigned i; diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 3919447f26..48eeb9212c 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -50,6 +50,7 @@ struct call_data { // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; + grpc_error* recv_initial_metadata_ready_error; grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; @@ -60,6 +61,9 @@ struct call_data { grpc_closure recv_message_ready; grpc_core::OrphanablePtr* recv_message; bool seen_recv_message_ready; + + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; }; } // namespace @@ -267,6 +271,7 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err); if (calld->seen_recv_message_ready) { // We've already seen the recv_message callback, but we previously // deferred it, so we need to return it here. @@ -313,6 +318,22 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { } } +static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + if (calld->recv_initial_metadata_ready_error != GRPC_ERROR_NONE) { + if (err == GRPC_ERROR_NONE) { + err = GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error); + } else { + err = grpc_error_add_child(err, calld->recv_initial_metadata_ready_error); + } + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); + } else { + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, + GRPC_ERROR_REF(err)); + } +} + static grpc_error* hs_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { /* grab pointers to our data from the call element */ @@ -357,6 +378,11 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + } + if (op->send_trailing_metadata) { grpc_error* error = hs_filter_outgoing_metadata( elem, op->payload->send_trailing_metadata.send_trailing_metadata); @@ -389,6 +415,9 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + hs_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -397,6 +426,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error); if (calld->have_read_stream) { calld->read_stream->Orphan(); } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index a859476dab..f3081df38d 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,7 +99,7 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; - grpc_closure recv_trailing_metadata; + grpc_closure recv_trailing_metadata_ready; // The error caused by a message that is too large, or GRPC_ERROR_NONE grpc_error* error; // Used by recv_message_ready. @@ -107,7 +107,7 @@ struct call_data { // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; // Original recv_trailing_metadata callback, invoked after our own. - grpc_closure* next_recv_trailing_metadata; + grpc_closure* next_recv_trailing_metadata_ready; }; struct channel_data { @@ -136,13 +136,13 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); GRPC_ERROR_UNREF(calld->error); - calld->error = GRPC_ERROR_REF(new_error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { error = grpc_error_add_child(error, new_error); GRPC_ERROR_UNREF(new_error); } + calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); } else { GRPC_ERROR_REF(error); @@ -153,19 +153,20 @@ static void recv_message_ready(void* user_data, grpc_error* error) { // Callback invoked on completion of recv_trailing_metadata // Notifies the recv_trailing_metadata batch of any message size failures -static void recv_trailing_metadata(void* user_data, grpc_error* error) { - grpc_call_element* elem = (grpc_call_element*)user_data; - call_data* calld = (call_data*)elem->call_data; - if (error == GRPC_ERROR_NONE) { - error = calld->error; - } else if (calld->error == GRPC_ERROR_NONE) { +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + if (calld->error != GRPC_ERROR_NONE) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(calld->error); + } else { + error = grpc_error_add_child(error, calld->error); + } + } else { error = GRPC_ERROR_REF(error); - } else if (calld->error != error) { - error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error); } - calld->error = GRPC_ERROR_NONE; // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata, error); + GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error); } // Start transport stream op. @@ -198,10 +199,10 @@ static void start_transport_stream_op_batch( } // Inject callback for receiving trailing metadata. if (op->recv_trailing_metadata) { - calld->next_recv_trailing_metadata = + calld->next_recv_trailing_metadata_ready = op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata; + &calld->recv_trailing_metadata_ready; } // Chain to the next filter. grpc_call_next_op(elem, op); @@ -214,12 +215,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; - calld->next_recv_trailing_metadata = nullptr; + calld->next_recv_trailing_metadata_ready = nullptr; calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata, recv_trailing_metadata, - elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index eae0110635..38a00b8b67 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -689,6 +689,8 @@ static void set_final_status(grpc_call* call, grpc_error* error) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); if (call->is_client) { const char** error_string = call->final_op.client.error_string; grpc_status_code code; @@ -698,9 +700,23 @@ static void set_final_status(grpc_call* call, grpc_error* error) { *call->final_op.client.status = code; *call->final_op.client.status_details = grpc_slice_ref_internal(slice); call->status_error = error; + if (channelz_channel != nullptr) { + if (*call->final_op.client.status != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } } else { *call->final_op.server.cancelled = error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; + if (channelz_channel != nullptr) { + if (*call->final_op.server.cancelled) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } GRPC_ERROR_UNREF(error); } } -- cgit v1.2.3 From 96c0a266a0a5f83a2bdd9678dbd05620f6380a91 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Fri, 22 Jun 2018 13:00:07 -0700 Subject: Changes --- src/core/ext/filters/http/client/http_client_filter.cc | 2 +- src/core/ext/filters/http/server/http_server_filter.cc | 9 +++++---- src/core/ext/filters/message_size/message_size_filter.cc | 5 +++-- 3 files changed, 9 insertions(+), 7 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index eadb9db1be..04ac4ac947 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -167,7 +167,7 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { if (calld->recv_initial_metadata_error != GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_REF(calld->recv_initial_metadata_error); - } else { + } else if (error != calld->recv_initial_metadata_error) { error = grpc_error_add_child(error, calld->recv_initial_metadata_error); } } diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 48eeb9212c..01e5aa445e 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -324,14 +324,15 @@ static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { if (calld->recv_initial_metadata_ready_error != GRPC_ERROR_NONE) { if (err == GRPC_ERROR_NONE) { err = GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error); - } else { + } else if (err != calld->recv_initial_metadata_ready_error) { err = grpc_error_add_child(err, calld->recv_initial_metadata_ready_error); + } else { + err = GRPC_ERROR_REF(err); } - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } else { - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, - GRPC_ERROR_REF(err)); + err = GRPC_ERROR_REF(err); } + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } static grpc_error* hs_mutate_op(grpc_call_element* elem, diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index f3081df38d..deb5ae70ec 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -140,7 +140,6 @@ static void recv_message_ready(void* user_data, grpc_error* error) { error = new_error; } else { error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); } calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); @@ -159,8 +158,10 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { if (calld->error != GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_REF(calld->error); + } else if (error != calld->error) { + error = grpc_error_add_child(error, GRPC_ERROR_REF(calld->error)); } else { - error = grpc_error_add_child(error, calld->error); + error = GRPC_ERROR_REF(error); } } else { error = GRPC_ERROR_REF(error); -- cgit v1.2.3 From e2a9e6cf721866c17110244f9da6c37f30f1faef Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Fri, 22 Jun 2018 13:25:43 -0700 Subject: Changes --- src/core/lib/surface/call.cc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 38a00b8b67..b97c2ec760 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1015,10 +1015,15 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b, set_final_status(call, GRPC_ERROR_REF(error)); grpc_metadata_batch_remove(b, b->idx.named.grpc_status); GRPC_ERROR_UNREF(error); - } else { - // TODO(kpayson) batch completed successfully w/no error + no status, should - // we assert instead? + } else if (!call->is_client) { set_final_status(call, GRPC_ERROR_NONE); + } else { + gpr_log(GPR_DEBUG, + "Received trailing metadata with no error and no status"); + set_final_status( + call, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN)); } publish_app_metadata(call, b, true); GRPC_ERROR_UNREF(batch_error); -- cgit v1.2.3 From 2069bf520f5428a5523d3fbd34eb44d63802332e Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Mon, 25 Jun 2018 10:37:51 -0700 Subject: Fix tsan failure --- src/core/lib/surface/call.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index b97c2ec760..2b451e46a1 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -189,7 +189,7 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; - bool cancelled; + gpr_atm cancelled; grpc_closure release_call; @@ -304,6 +304,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); + gpr_atm_no_barrier_store(&call->cancelled, 0); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); *out_call = call; @@ -646,11 +647,10 @@ static void done_termination(void* arg, grpc_error* error) { } static void cancel_with_error(grpc_call* c, grpc_error* error) { - if (c->cancelled) { + if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) { GRPC_ERROR_UNREF(error); return; } - c->cancelled = true; GRPC_CALL_INTERNAL_REF(c, "termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call -- cgit v1.2.3 From a9e27ae2c4138d2bd8f9c49d999ac3adb5baabea Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Tue, 26 Jun 2018 14:51:04 -0700 Subject: Roll back filter changes --- .../ext/filters/http/client/http_client_filter.cc | 14 +------ .../ext/filters/http/server/http_server_filter.cc | 31 --------------- .../filters/message_size/message_size_filter.cc | 45 +--------------------- 3 files changed, 3 insertions(+), 87 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 04ac4ac947..1678051beb 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -51,7 +51,6 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; - grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. @@ -148,7 +147,6 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { call_data* calld = static_cast(elem->call_data); if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); - calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } @@ -164,13 +162,6 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } - if (calld->recv_initial_metadata_error != GRPC_ERROR_NONE) { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_REF(calld->recv_initial_metadata_error); - } else if (error != calld->recv_initial_metadata_error) { - error = grpc_error_add_child(error, calld->recv_initial_metadata_error); - } - } GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } @@ -443,10 +434,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) { - call_data* calld = static_cast(elem->call_data); - GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); -} + grpc_closure* ignored) {} static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { unsigned i; diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 01e5aa445e..3919447f26 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -50,7 +50,6 @@ struct call_data { // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; - grpc_error* recv_initial_metadata_ready_error; grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; @@ -61,9 +60,6 @@ struct call_data { grpc_closure recv_message_ready; grpc_core::OrphanablePtr* recv_message; bool seen_recv_message_ready; - - grpc_closure recv_trailing_metadata_ready; - grpc_closure* original_recv_trailing_metadata_ready; }; } // namespace @@ -271,7 +267,6 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata); - calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err); if (calld->seen_recv_message_ready) { // We've already seen the recv_message callback, but we previously // deferred it, so we need to return it here. @@ -318,23 +313,6 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { } } -static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { - grpc_call_element* elem = static_cast(user_data); - call_data* calld = static_cast(elem->call_data); - if (calld->recv_initial_metadata_ready_error != GRPC_ERROR_NONE) { - if (err == GRPC_ERROR_NONE) { - err = GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error); - } else if (err != calld->recv_initial_metadata_ready_error) { - err = grpc_error_add_child(err, calld->recv_initial_metadata_ready_error); - } else { - err = GRPC_ERROR_REF(err); - } - } else { - err = GRPC_ERROR_REF(err); - } - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); -} - static grpc_error* hs_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { /* grab pointers to our data from the call element */ @@ -379,11 +357,6 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } - if (op->recv_trailing_metadata) { - calld->original_recv_trailing_metadata_ready = - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - } - if (op->send_trailing_metadata) { grpc_error* error = hs_filter_outgoing_metadata( elem, op->payload->send_trailing_metadata.send_trailing_metadata); @@ -416,9 +389,6 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - hs_recv_trailing_metadata_ready, elem, - grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -427,7 +397,6 @@ static void hs_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast(elem->call_data); - GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error); if (calld->have_read_stream) { calld->read_stream->Orphan(); } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index deb5ae70ec..c7fc3f2e62 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,15 +99,10 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; - grpc_closure recv_trailing_metadata_ready; - // The error caused by a message that is too large, or GRPC_ERROR_NONE - grpc_error* error; // Used by recv_message_ready. grpc_core::OrphanablePtr* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; - // Original recv_trailing_metadata callback, invoked after our own. - grpc_closure* next_recv_trailing_metadata_ready; }; struct channel_data { @@ -135,13 +130,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); - GRPC_ERROR_UNREF(calld->error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { error = grpc_error_add_child(error, new_error); + GRPC_ERROR_UNREF(new_error); } - calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); } else { GRPC_ERROR_REF(error); @@ -150,26 +144,6 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); } -// Callback invoked on completion of recv_trailing_metadata -// Notifies the recv_trailing_metadata batch of any message size failures -static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { - grpc_call_element* elem = static_cast(user_data); - call_data* calld = static_cast(elem->call_data); - if (calld->error != GRPC_ERROR_NONE) { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_REF(calld->error); - } else if (error != calld->error) { - error = grpc_error_add_child(error, GRPC_ERROR_REF(calld->error)); - } else { - error = GRPC_ERROR_REF(error); - } - } else { - error = GRPC_ERROR_REF(error); - } - // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error); -} - // Start transport stream op. static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { @@ -198,13 +172,6 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } - // Inject callback for receiving trailing metadata. - if (op->recv_trailing_metadata) { - calld->next_recv_trailing_metadata_ready = - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready; - } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -216,13 +183,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; - calld->next_recv_trailing_metadata_ready = nullptr; - calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, elem, - grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -251,10 +213,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) { - call_data* calld = (call_data*)elem->call_data; - GRPC_ERROR_UNREF(calld->error); -} + grpc_closure* ignored) {} static int default_size(const grpc_channel_args* args, int without_minimal_stack) { -- cgit v1.2.3 From 299c2c5e115b6a1204d67f3a4ae8de66c5386fe6 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Mon, 9 Jul 2018 12:10:02 -0700 Subject: Include cancellation error --- src/core/lib/surface/call.cc | 62 ++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 20 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 6f136cd2a9..6f0905a9da 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -190,7 +190,7 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; - gpr_atm cancelled; + gpr_atm cancel_error; grpc_closure release_call; @@ -301,7 +301,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); - gpr_atm_no_barrier_store(&call->cancelled, 0); + gpr_atm_no_barrier_store(&call->cancel_error, GRPC_ERROR_NONE); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); *out_call = call; @@ -509,10 +509,33 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(c->channel); + if (c->is_client) { + if (channelz_channel != nullptr) { + if (*c->final_op.client.status != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } + } else { + if (channelz_channel != nullptr) { + if (*c->final_op.server.cancelled || c->status_error != GRPC_ERROR_NONE) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } + } + grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, c->final_info.error_string); + grpc_error* cancel_error = + (grpc_error*)gpr_atm_no_barrier_load(&c->cancel_error); + GRPC_ERROR_UNREF(cancel_error); GRPC_ERROR_UNREF(c->status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -644,10 +667,12 @@ static void done_termination(void* arg, grpc_error* error) { } static void cancel_with_error(grpc_call* c, grpc_error* error) { - if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) { + if (!gpr_atm_rel_cas(&c->cancel_error, (gpr_atm)GRPC_ERROR_NONE, + (gpr_atm)error)) { GRPC_ERROR_UNREF(error); return; } + GRPC_ERROR_REF(error); GRPC_CALL_INTERNAL_REF(c, "termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call @@ -681,14 +706,26 @@ static void cancel_with_status(grpc_call* c, grpc_status_code status, cancel_with_error(c, error_from_status(status, description)); } +// The final status of a call is determined when the +// recv_trailing_metadata_ready callback is invoked. The status is determined +// based on the following (in order). +// 1. The first cancellation error that has occured prior to the callback. +// 2. The error passed to the recv_trailing_metadata_ready callback. +// 3. The status from the metadata on the call (grpc-status). static void set_final_status(grpc_call* call, grpc_error* error) { if (grpc_call_error_trace.enabled()) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); + grpc_error* cancel_error = + (grpc_error*)gpr_atm_no_barrier_load(&call->cancel_error); + if (cancel_error != GRPC_ERROR_NONE && error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(cancel_error); + } else if (cancel_error != GRPC_ERROR_NONE && cancel_error != error) { + error = grpc_error_add_child(GRPC_ERROR_REF(cancel_error), error); + } if (call->is_client) { + call->status_error = error; const char** error_string = call->final_op.client.error_string; grpc_status_code code; grpc_slice slice = grpc_empty_slice(); @@ -696,24 +733,9 @@ static void set_final_status(grpc_call* call, grpc_error* error) { error_string); *call->final_op.client.status = code; *call->final_op.client.status_details = grpc_slice_ref_internal(slice); - call->status_error = error; - if (channelz_channel != nullptr) { - if (*call->final_op.client.status != GRPC_STATUS_OK) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } } else { *call->final_op.server.cancelled = error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; - if (channelz_channel != nullptr) { - if (*call->final_op.server.cancelled) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } GRPC_ERROR_UNREF(error); } } -- cgit v1.2.3 From a3d93f324b93329de79124d570dbddb47a6862e0 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 11 Jul 2018 19:34:24 -0700 Subject: fix format --- src/core/lib/iomgr/timer_generic.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 70417d69b6..008d37119a 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -716,7 +716,8 @@ static grpc_timer_check_result timer_check(grpc_millis* next) { "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64 " glob_min=%" PRId64, now, next_str, min_timer, - gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer))); + static_cast(gpr_atm_no_barrier_load( + (gpr_atm*)(&g_shared_mutables.min_timer)))); #else gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64, now, next_str, min_timer); -- cgit v1.2.3 From a135059ce7d2db3180045810f66dd69a8fc88284 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Fri, 13 Jul 2018 13:24:31 -0700 Subject: Make error strings consistent --- src/core/lib/iomgr/error.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 90ed34da11..93a5c20abb 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -399,14 +399,14 @@ static grpc_error* copy_error_and_unref(grpc_error* in) { out = GRPC_ERROR_CREATE_FROM_STATIC_STRING("unknown"); if (in == GRPC_ERROR_NONE) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_static_string("no error")); + grpc_slice_from_static_string("No Error")); internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK); } else if (in == GRPC_ERROR_OOM) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_static_string("oom")); + grpc_slice_from_static_string("Out of memory")); } else if (in == GRPC_ERROR_CANCELLED) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_static_string("cancelled")); + grpc_slice_from_static_string("Cancelled")); internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED); } } else if (gpr_ref_is_unique(&in->atomics.refs)) { -- cgit v1.2.3 From 5defb7354b620a67ef42cb2ad73bab7bdd512fa6 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Fri, 13 Jul 2018 13:49:23 -0700 Subject: compile error fix --- src/core/lib/surface/call.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 6f0905a9da..88156ed3bc 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -301,7 +301,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); - gpr_atm_no_barrier_store(&call->cancel_error, GRPC_ERROR_NONE); + gpr_atm_no_barrier_store(&call->cancel_error, (gpr_atm)GRPC_ERROR_NONE); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); *out_call = call; -- cgit v1.2.3 From a4e3bed5fb8f9203fa580f00a5cb1136415c1949 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Tue, 24 Jul 2018 12:11:33 -0700 Subject: changes --- src/core/lib/surface/call.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index d9feb10850..c92172f278 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -532,7 +532,7 @@ static void destroy_call(void* call, grpc_error* error) { grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, - c->final_info.error_string); + &c->final_info.error_string); grpc_error* cancel_error = (grpc_error*)gpr_atm_no_barrier_load(&c->cancel_error); GRPC_ERROR_UNREF(cancel_error); @@ -1445,10 +1445,10 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(call, md, GRPC_ERROR_REF(error)); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); finish_batch_step(bctl); } -- cgit v1.2.3 From 65007f624e3a7be460dda7c979ae939c89519cad Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Tue, 24 Jul 2018 14:50:16 -0700 Subject: Channelz fixes --- src/core/lib/surface/call.cc | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c92172f278..f5abf1a1a6 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -509,26 +509,6 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(c->channel); - if (c->is_client) { - if (channelz_channel != nullptr) { - if (*c->final_op.client.status != GRPC_STATUS_OK) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } - } else { - if (channelz_channel != nullptr) { - if (*c->final_op.server.cancelled || c->status_error != GRPC_ERROR_NONE) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } - } - grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, @@ -734,6 +714,15 @@ static void set_final_status(grpc_call* call, grpc_error* error) { error_string); *call->final_op.client.status = code; *call->final_op.client.status_details = grpc_slice_ref_internal(slice); + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + if (code != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } } else { *call->final_op.server.cancelled = error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; @@ -1683,6 +1672,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, static_cast( op->data.send_status_from_server.status)); call->status_error = status_error; + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); + if (channelz_channel != nullptr) { + if (call->status_error != GRPC_ERROR_NONE) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } if (!prepare_application_metadata( call, static_cast( -- cgit v1.2.3 From 961fe4b523d8d097f32d6bb1df12ec8e0dcdaa4a Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Wed, 25 Jul 2018 10:49:48 -0700 Subject: Revert "Channelz fixes" This reverts commit 65007f624e3a7be460dda7c979ae939c89519cad. --- src/core/lib/surface/call.cc | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index f5abf1a1a6..c92172f278 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -509,6 +509,26 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(c->channel); + if (c->is_client) { + if (channelz_channel != nullptr) { + if (*c->final_op.client.status != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } + } else { + if (channelz_channel != nullptr) { + if (*c->final_op.server.cancelled || c->status_error != GRPC_ERROR_NONE) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } + } + grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, @@ -714,15 +734,6 @@ static void set_final_status(grpc_call* call, grpc_error* error) { error_string); *call->final_op.client.status = code; *call->final_op.client.status_details = grpc_slice_ref_internal(slice); - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); - if (channelz_channel != nullptr) { - if (code != GRPC_STATUS_OK) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } } else { *call->final_op.server.cancelled = error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; @@ -1672,15 +1683,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, static_cast( op->data.send_status_from_server.status)); call->status_error = status_error; - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); - if (channelz_channel != nullptr) { - if (call->status_error != GRPC_ERROR_NONE) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } if (!prepare_application_metadata( call, static_cast( -- cgit v1.2.3 From ac4b2527850f966c8204dab091247b05489ad9d2 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Wed, 25 Jul 2018 10:50:01 -0700 Subject: Revert "changes" This reverts commit a4e3bed5fb8f9203fa580f00a5cb1136415c1949. --- src/core/lib/surface/call.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c92172f278..d9feb10850 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -532,7 +532,7 @@ static void destroy_call(void* call, grpc_error* error) { grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, - &c->final_info.error_string); + c->final_info.error_string); grpc_error* cancel_error = (grpc_error*)gpr_atm_no_barrier_load(&c->cancel_error); GRPC_ERROR_UNREF(cancel_error); @@ -1445,10 +1445,10 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(call, md, GRPC_ERROR_REF(error)); - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); finish_batch_step(bctl); } -- cgit v1.2.3 From fa52bee77b34542566b40c8f368a90d2f12d8231 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Wed, 25 Jul 2018 10:50:42 -0700 Subject: Revert "compile error fix" This reverts commit 5defb7354b620a67ef42cb2ad73bab7bdd512fa6. --- src/core/lib/surface/call.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index d9feb10850..ffba385605 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -301,7 +301,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); - gpr_atm_no_barrier_store(&call->cancel_error, (gpr_atm)GRPC_ERROR_NONE); + gpr_atm_no_barrier_store(&call->cancel_error, GRPC_ERROR_NONE); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); *out_call = call; -- cgit v1.2.3 From 1987e391bcdd572003fe9b6e1e27671203adebab Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Wed, 25 Jul 2018 10:50:59 -0700 Subject: Revert "Make error strings consistent" This reverts commit a135059ce7d2db3180045810f66dd69a8fc88284. --- src/core/lib/iomgr/error.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 93a5c20abb..90ed34da11 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -399,14 +399,14 @@ static grpc_error* copy_error_and_unref(grpc_error* in) { out = GRPC_ERROR_CREATE_FROM_STATIC_STRING("unknown"); if (in == GRPC_ERROR_NONE) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_static_string("No Error")); + grpc_slice_from_static_string("no error")); internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK); } else if (in == GRPC_ERROR_OOM) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_static_string("Out of memory")); + grpc_slice_from_static_string("oom")); } else if (in == GRPC_ERROR_CANCELLED) { internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, - grpc_slice_from_static_string("Cancelled")); + grpc_slice_from_static_string("cancelled")); internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED); } } else if (gpr_ref_is_unique(&in->atomics.refs)) { -- cgit v1.2.3 From a7bd899970b01198a4ba66b7f9fa47e658e506aa Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Wed, 25 Jul 2018 10:51:18 -0700 Subject: Revert "Include cancellation error" This reverts commit 299c2c5e115b6a1204d67f3a4ae8de66c5386fe6. --- src/core/lib/surface/call.cc | 62 ++++++++++++++------------------------------ 1 file changed, 20 insertions(+), 42 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index ffba385605..43dd795ce7 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -190,7 +190,7 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; - gpr_atm cancel_error; + gpr_atm cancelled; grpc_closure release_call; @@ -301,7 +301,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); - gpr_atm_no_barrier_store(&call->cancel_error, GRPC_ERROR_NONE); + gpr_atm_no_barrier_store(&call->cancelled, 0); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); *out_call = call; @@ -509,33 +509,10 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(c->channel); - if (c->is_client) { - if (channelz_channel != nullptr) { - if (*c->final_op.client.status != GRPC_STATUS_OK) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } - } else { - if (channelz_channel != nullptr) { - if (*c->final_op.server.cancelled || c->status_error != GRPC_ERROR_NONE) { - channelz_channel->RecordCallFailed(); - } else { - channelz_channel->RecordCallSucceeded(); - } - } - } - grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, c->final_info.error_string); - grpc_error* cancel_error = - (grpc_error*)gpr_atm_no_barrier_load(&c->cancel_error); - GRPC_ERROR_UNREF(cancel_error); GRPC_ERROR_UNREF(c->status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -668,12 +645,10 @@ static void done_termination(void* arg, grpc_error* error) { } static void cancel_with_error(grpc_call* c, grpc_error* error) { - if (!gpr_atm_rel_cas(&c->cancel_error, (gpr_atm)GRPC_ERROR_NONE, - (gpr_atm)error)) { + if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) { GRPC_ERROR_UNREF(error); return; } - GRPC_ERROR_REF(error); GRPC_CALL_INTERNAL_REF(c, "termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call @@ -707,26 +682,14 @@ static void cancel_with_status(grpc_call* c, grpc_status_code status, cancel_with_error(c, error_from_status(status, description)); } -// The final status of a call is determined when the -// recv_trailing_metadata_ready callback is invoked. The status is determined -// based on the following (in order). -// 1. The first cancellation error that has occured prior to the callback. -// 2. The error passed to the recv_trailing_metadata_ready callback. -// 3. The status from the metadata on the call (grpc-status). static void set_final_status(grpc_call* call, grpc_error* error) { if (grpc_call_error_trace.enabled()) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } - grpc_error* cancel_error = - (grpc_error*)gpr_atm_no_barrier_load(&call->cancel_error); - if (cancel_error != GRPC_ERROR_NONE && error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_REF(cancel_error); - } else if (cancel_error != GRPC_ERROR_NONE && cancel_error != error) { - error = grpc_error_add_child(GRPC_ERROR_REF(cancel_error), error); - } + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); if (call->is_client) { - call->status_error = error; const char** error_string = call->final_op.client.error_string; grpc_status_code code; grpc_slice slice = grpc_empty_slice(); @@ -734,9 +697,24 @@ static void set_final_status(grpc_call* call, grpc_error* error) { error_string); *call->final_op.client.status = code; *call->final_op.client.status_details = grpc_slice_ref_internal(slice); + call->status_error = error; + if (channelz_channel != nullptr) { + if (*call->final_op.client.status != GRPC_STATUS_OK) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } } else { *call->final_op.server.cancelled = error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; + if (channelz_channel != nullptr) { + if (*call->final_op.server.cancelled) { + channelz_channel->RecordCallFailed(); + } else { + channelz_channel->RecordCallSucceeded(); + } + } GRPC_ERROR_UNREF(error); } } -- cgit v1.2.3 From ab9b61dfccfb1e14e102332ee2619a6ee12e8081 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Wed, 25 Jul 2018 10:51:37 -0700 Subject: Revert "Roll back filter changes" This reverts commit a9e27ae2c4138d2bd8f9c49d999ac3adb5baabea. --- .../ext/filters/http/client/http_client_filter.cc | 14 ++++++- .../ext/filters/http/server/http_server_filter.cc | 31 +++++++++++++++ .../filters/message_size/message_size_filter.cc | 45 +++++++++++++++++++++- 3 files changed, 87 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 1678051beb..04ac4ac947 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -51,6 +51,7 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; + grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. @@ -147,6 +148,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { call_data* calld = static_cast(elem->call_data); if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } @@ -162,6 +164,13 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } + if (calld->recv_initial_metadata_error != GRPC_ERROR_NONE) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(calld->recv_initial_metadata_error); + } else if (error != calld->recv_initial_metadata_error) { + error = grpc_error_add_child(error, calld->recv_initial_metadata_error); + } + } GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } @@ -434,7 +443,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); +} static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { unsigned i; diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 3919447f26..01e5aa445e 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -50,6 +50,7 @@ struct call_data { // State for intercepting recv_initial_metadata. grpc_closure recv_initial_metadata_ready; + grpc_error* recv_initial_metadata_ready_error; grpc_closure* original_recv_initial_metadata_ready; grpc_metadata_batch* recv_initial_metadata; uint32_t* recv_initial_metadata_flags; @@ -60,6 +61,9 @@ struct call_data { grpc_closure recv_message_ready; grpc_core::OrphanablePtr* recv_message; bool seen_recv_message_ready; + + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; }; } // namespace @@ -267,6 +271,7 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { calld->seen_recv_initial_metadata_ready = true; if (err == GRPC_ERROR_NONE) { err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err); if (calld->seen_recv_message_ready) { // We've already seen the recv_message callback, but we previously // deferred it, so we need to return it here. @@ -313,6 +318,23 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { } } +static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + if (calld->recv_initial_metadata_ready_error != GRPC_ERROR_NONE) { + if (err == GRPC_ERROR_NONE) { + err = GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error); + } else if (err != calld->recv_initial_metadata_ready_error) { + err = grpc_error_add_child(err, calld->recv_initial_metadata_ready_error); + } else { + err = GRPC_ERROR_REF(err); + } + } else { + err = GRPC_ERROR_REF(err); + } + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static grpc_error* hs_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { /* grab pointers to our data from the call element */ @@ -357,6 +379,11 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + } + if (op->send_trailing_metadata) { grpc_error* error = hs_filter_outgoing_metadata( elem, op->payload->send_trailing_metadata.send_trailing_metadata); @@ -389,6 +416,9 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + hs_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -397,6 +427,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = static_cast(elem->call_data); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error); if (calld->have_read_stream) { calld->read_stream->Orphan(); } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index c7fc3f2e62..deb5ae70ec 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -99,10 +99,15 @@ struct call_data { // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; + grpc_closure recv_trailing_metadata_ready; + // The error caused by a message that is too large, or GRPC_ERROR_NONE + grpc_error* error; // Used by recv_message_ready. grpc_core::OrphanablePtr* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; + // Original recv_trailing_metadata callback, invoked after our own. + grpc_closure* next_recv_trailing_metadata_ready; }; struct channel_data { @@ -130,12 +135,13 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_ERROR_UNREF(calld->error); if (error == GRPC_ERROR_NONE) { error = new_error; } else { error = grpc_error_add_child(error, new_error); - GRPC_ERROR_UNREF(new_error); } + calld->error = GRPC_ERROR_REF(error); gpr_free(message_string); } else { GRPC_ERROR_REF(error); @@ -144,6 +150,26 @@ static void recv_message_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); } +// Callback invoked on completion of recv_trailing_metadata +// Notifies the recv_trailing_metadata batch of any message size failures +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + if (calld->error != GRPC_ERROR_NONE) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_REF(calld->error); + } else if (error != calld->error) { + error = grpc_error_add_child(error, GRPC_ERROR_REF(calld->error)); + } else { + error = GRPC_ERROR_REF(error); + } + } else { + error = GRPC_ERROR_REF(error); + } + // Invoke the next callback. + GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error); +} + // Start transport stream op. static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { @@ -172,6 +198,13 @@ static void start_transport_stream_op_batch( calld->recv_message = op->payload->recv_message.recv_message; op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } + // Inject callback for receiving trailing metadata. + if (op->recv_trailing_metadata) { + calld->next_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } // Chain to the next filter. grpc_call_next_op(elem, op); } @@ -183,8 +216,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; + calld->next_recv_trailing_metadata_ready = nullptr; + calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Get max sizes from channel data, then merge in per-method config values. // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response @@ -213,7 +251,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, // Destructor for call_data. static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = (call_data*)elem->call_data; + GRPC_ERROR_UNREF(calld->error); +} static int default_size(const grpc_channel_args* args, int without_minimal_stack) { -- cgit v1.2.3 From aa8043b7c8df2182e57cc561f5477fc3a35c6d95 Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Wed, 25 Jul 2018 10:59:06 -0700 Subject: changes --- src/core/lib/surface/call.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 43dd795ce7..1ee3667a58 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -512,7 +512,7 @@ static void destroy_call(void* call, grpc_error* error) { grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, - c->final_info.error_string); + &c->final_info.error_string); GRPC_ERROR_UNREF(c->status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); -- cgit v1.2.3 From 1482d3f25c75d1773fd3ab7320403f93d173fc7f Mon Sep 17 00:00:00 2001 From: kpayson64 Date: Fri, 27 Jul 2018 12:21:05 -0700 Subject: PR Feedback Changes --- .../ext/filters/http/server/http_server_filter.cc | 15 ++++--------- .../filters/message_size/message_size_filter.cc | 12 +---------- src/core/lib/iomgr/error.cc | 14 ++++++++++++ src/core/lib/iomgr/error.h | 7 ++++++ .../lib/security/transport/server_auth_filter.cc | 25 +++++++++++++++++++++- src/core/lib/surface/call.cc | 2 +- src/core/lib/surface/server.cc | 23 ++++++++++++++++++-- 7 files changed, 72 insertions(+), 26 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 01e5aa445e..c66f531a89 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -321,17 +321,8 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); - if (calld->recv_initial_metadata_ready_error != GRPC_ERROR_NONE) { - if (err == GRPC_ERROR_NONE) { - err = GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error); - } else if (err != calld->recv_initial_metadata_ready_error) { - err = grpc_error_add_child(err, calld->recv_initial_metadata_ready_error); - } else { - err = GRPC_ERROR_REF(err); - } - } else { - err = GRPC_ERROR_REF(err); - } + err = + grpc_error_maybe_add_child(err, calld->recv_initial_metadata_ready_error); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } @@ -382,6 +373,8 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem, if (op->recv_trailing_metadata) { calld->original_recv_trailing_metadata_ready = op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; } if (op->send_trailing_metadata) { diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index deb5ae70ec..b5ca1be804 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -155,17 +155,7 @@ static void recv_message_ready(void* user_data, grpc_error* error) { static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); - if (calld->error != GRPC_ERROR_NONE) { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_REF(calld->error); - } else if (error != calld->error) { - error = grpc_error_add_child(error, GRPC_ERROR_REF(calld->error)); - } else { - error = GRPC_ERROR_REF(error); - } - } else { - error = GRPC_ERROR_REF(error); - } + error = grpc_error_maybe_add_child(error, calld->error); // Invoke the next callback. GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error); } diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 90ed34da11..226b44c46d 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -518,6 +518,20 @@ grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) { return new_err; } +grpc_error* grpc_error_maybe_add_child(grpc_error* src, grpc_error* child) { + if (src != GRPC_ERROR_NONE) { + if (child == GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(src); + } else if (child != src) { + return grpc_error_add_child(src, GRPC_ERROR_REF(child)); + } else { + return GRPC_ERROR_REF(src); + } + } else { + return GRPC_ERROR_REF(child); + } +} + static const char* no_error_string = "\"No Error\""; static const char* oom_error_string = "\"Out of memory\""; static const char* cancelled_error_string = "\"Cancelled\""; diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 27c4d22fd1..4f65c447fd 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -187,6 +187,13 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// child error. grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; +/// Produce an error that is a combination of both src and child. +// If src or child, is GRPC_ERROR_NONE, a new reference to the other error is +// returned. Otherwise, a new error with src as the parent and child as the +// child is returned. +grpc_error* grpc_error_maybe_add_child(grpc_error* src, + grpc_error* child) GRPC_MUST_USE_RESULT; + grpc_error* grpc_os_error(const char* file, int line, int err, const char* call_name) GRPC_MUST_USE_RESULT; diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 2dbefdf131..7732b695b3 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -41,6 +41,9 @@ struct call_data { grpc_transport_stream_op_batch* recv_initial_metadata_batch; grpc_closure* original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; + grpc_error* error; + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; grpc_metadata_array md; const grpc_metadata* consumed_md; size_t num_consumed_md; @@ -111,6 +114,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); } + calld->error = GRPC_ERROR_REF(error); GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error); } @@ -186,6 +190,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { GRPC_ERROR_REF(error)); } +static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + err = grpc_error_maybe_add_child(err, calld->error); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static void auth_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = static_cast(elem->call_data); @@ -197,6 +208,12 @@ static void auth_start_transport_stream_op_batch( batch->payload->recv_initial_metadata.recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; } + if (batch->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } grpc_call_next_op(elem, batch); } @@ -210,6 +227,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); // Create server security context. Set its auth context from channel // data and save it in the call context. grpc_server_security_context* server_ctx = @@ -229,7 +249,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem, /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast(elem->call_data); + GRPC_ERROR_UNREF(calld->error); +} /* Constructor for channel_data */ static grpc_error* init_channel_elem(grpc_channel_element* elem, diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 1ee3667a58..151cf6c852 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -512,7 +512,7 @@ static void destroy_call(void* call, grpc_error* error) { grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, &c->final_info.final_status, &slice, nullptr, - &c->final_info.error_string); + &(c->final_info.error_string)); GRPC_ERROR_UNREF(c->status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index cb34def740..4403caf044 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -149,6 +149,9 @@ struct call_data { grpc_closure server_on_recv_initial_metadata; grpc_closure kill_zombie_closure; grpc_closure* on_done_recv_initial_metadata; + grpc_closure recv_trailing_metadata_ready; + grpc_error* error; + grpc_closure* original_recv_trailing_metadata_ready; grpc_closure publish; @@ -730,6 +733,14 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error); } +static void server_recv_trailing_metadata_ready(void* user_data, + grpc_error* err) { + grpc_call_element* elem = static_cast(user_data); + call_data* calld = static_cast(elem->call_data); + err = grpc_error_maybe_add_child(err, calld->error); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); +} + static void server_mutate_op(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { call_data* calld = static_cast(elem->call_data); @@ -745,6 +756,12 @@ static void server_mutate_op(grpc_call_element* elem, op->payload->recv_initial_metadata.recv_flags = &calld->recv_initial_metadata_flags; } + if (op->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready = + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready; + } } static void server_start_transport_stream_op_batch( @@ -828,7 +845,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, server_on_recv_initial_metadata, elem, grpc_schedule_on_exec_ctx); - + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + server_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); server_ref(chand->server); return GRPC_ERROR_NONE; } @@ -840,7 +859,7 @@ static void destroy_call_elem(grpc_call_element* elem, call_data* calld = static_cast(elem->call_data); GPR_ASSERT(calld->state != PENDING); - + GRPC_ERROR_UNREF(calld->error); if (calld->host_set) { grpc_slice_unref_internal(calld->host); } -- cgit v1.2.3 From 85ff14dd16524b408bd30eb76d790be126aebb90 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 21 Aug 2018 10:11:53 -0700 Subject: Modify existing error child manipulation --- src/core/ext/filters/http/client/http_client_filter.cc | 9 ++------- src/core/ext/filters/http/server/http_server_filter.cc | 5 +++-- src/core/ext/filters/message_size/message_size_filter.cc | 3 ++- src/core/lib/iomgr/error.cc | 16 ++++++---------- src/core/lib/iomgr/error.h | 6 ------ src/core/lib/security/transport/server_auth_filter.cc | 2 +- src/core/lib/surface/call.cc | 4 +--- src/core/lib/surface/server.cc | 2 +- 8 files changed, 16 insertions(+), 31 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 04ac4ac947..f44dc032a7 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -164,13 +164,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } - if (calld->recv_initial_metadata_error != GRPC_ERROR_NONE) { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_REF(calld->recv_initial_metadata_error); - } else if (error != calld->recv_initial_metadata_error) { - error = grpc_error_add_child(error, calld->recv_initial_metadata_error); - } - } + error = grpc_error_add_child( + error, GRPC_ERROR_REF(calld->recv_initial_metadata_error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index c66f531a89..926afeec84 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -321,8 +321,9 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); - err = - grpc_error_maybe_add_child(err, calld->recv_initial_metadata_ready_error); + err = grpc_error_add_child( + GRPC_ERROR_REF(err), + GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index b5ca1be804..a5f5f8e2ff 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -155,7 +155,8 @@ static void recv_message_ready(void* user_data, grpc_error* error) { static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); - error = grpc_error_maybe_add_child(error, calld->error); + error = + grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); // Invoke the next callback. GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error); } diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 226b44c46d..dfd063a695 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -513,22 +513,18 @@ bool grpc_error_get_str(grpc_error* err, grpc_error_strs which, grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) { GPR_TIMER_SCOPE("grpc_error_add_child", 0); - grpc_error* new_err = copy_error_and_unref(src); - internal_add_error(&new_err, child); - return new_err; -} - -grpc_error* grpc_error_maybe_add_child(grpc_error* src, grpc_error* child) { if (src != GRPC_ERROR_NONE) { if (child == GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(src); + return src; } else if (child != src) { - return grpc_error_add_child(src, GRPC_ERROR_REF(child)); + grpc_error* new_err = copy_error_and_unref(src); + internal_add_error(&new_err, child); + return new_err; } else { - return GRPC_ERROR_REF(src); + return src; } } else { - return GRPC_ERROR_REF(child); + return child; } } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 4f65c447fd..e5369695c9 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -187,12 +187,6 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// child error. grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; -/// Produce an error that is a combination of both src and child. -// If src or child, is GRPC_ERROR_NONE, a new reference to the other error is -// returned. Otherwise, a new error with src as the parent and child as the -// child is returned. -grpc_error* grpc_error_maybe_add_child(grpc_error* src, - grpc_error* child) GRPC_MUST_USE_RESULT; grpc_error* grpc_os_error(const char* file, int line, int err, const char* call_name) GRPC_MUST_USE_RESULT; diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 5f2ad261d2..552e70130a 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -191,7 +191,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); - err = grpc_error_maybe_add_child(err, calld->error); + err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 56c4562952..d2c14571d3 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -686,6 +686,7 @@ static void cancel_with_status(grpc_call* c, grpc_status_code status, } static void set_final_status(grpc_call* call, grpc_error* error) { + gpr_log(GPR_INFO, "set final status"); if (grpc_call_error_trace.enabled()) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); @@ -1650,9 +1651,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, grpc_slice_ref_internal( *op->data.send_status_from_server.status_details)); call->send_extra_metadata_count++; - char* msg = grpc_slice_to_c_string( - GRPC_MDVALUE(call->send_extra_metadata[1].md)); - gpr_free(msg); } grpc_error* status_error = op->data.send_status_from_server.status == GRPC_STATUS_OK diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 4403caf044..521825ca69 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -737,7 +737,7 @@ static void server_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); - err = grpc_error_maybe_add_child(err, calld->error); + err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); } -- cgit v1.2.3 From 97ceb5962cc5317b262aa60b8863c7df7799f640 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 21 Aug 2018 10:42:22 -0700 Subject: Remove unwanted logs and also add status details to status error --- src/core/lib/surface/call.cc | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index d2c14571d3..d0f418220b 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -686,7 +686,6 @@ static void cancel_with_status(grpc_call* c, grpc_status_code status, } static void set_final_status(grpc_call* call, grpc_error* error) { - gpr_log(GPR_INFO, "set final status"); if (grpc_call_error_trace.enabled()) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); @@ -1645,13 +1644,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( call->channel, op->data.send_status_from_server.status); - if (op->data.send_status_from_server.status_details != nullptr) { - call->send_extra_metadata[1].md = grpc_mdelem_from_slices( - GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_ref_internal( - *op->data.send_status_from_server.status_details)); - call->send_extra_metadata_count++; - } grpc_error* status_error = op->data.send_status_from_server.status == GRPC_STATUS_OK ? GRPC_ERROR_NONE @@ -1661,6 +1653,22 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, GRPC_ERROR_INT_GRPC_STATUS, static_cast( op->data.send_status_from_server.status)); + if (op->data.send_status_from_server.status_details != nullptr) { + call->send_extra_metadata[1].md = grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_ref_internal( + *op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + if (status_error != GRPC_ERROR_NONE) { + char* msg = grpc_slice_to_c_string( + GRPC_MDVALUE(call->send_extra_metadata[1].md)); + status_error = + grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)); + gpr_free(msg); + } + } + call->status_error = status_error; if (!prepare_application_metadata( call, -- cgit v1.2.3 From 1cfd81a604699d6ab8095c90e7da5fc588bb3048 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 21 Aug 2018 16:25:40 -0700 Subject: Explain the newer semantics of grpc_error_add_child --- src/core/lib/iomgr/error.cc | 5 +++++ src/core/lib/iomgr/error.h | 7 +++++++ test/core/iomgr/error_test.cc | 11 ----------- 3 files changed, 12 insertions(+), 11 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index dfd063a695..13bc69ffb6 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -515,15 +515,20 @@ grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) { GPR_TIMER_SCOPE("grpc_error_add_child", 0); if (src != GRPC_ERROR_NONE) { if (child == GRPC_ERROR_NONE) { + /* \a child is empty. Simply return the ref to \a src */ return src; } else if (child != src) { grpc_error* new_err = copy_error_and_unref(src); internal_add_error(&new_err, child); return new_err; } else { + /* \a src and \a child are the same. Drop one of the references and return + * the other */ + GRPC_ERROR_UNREF(child); return src; } } else { + /* \a src is empty. Simply return the ref to \a child */ return child; } } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index e5369695c9..49f4029bc2 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -185,6 +185,13 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// error occurring. Allows root causing high level errors from lower level /// errors that contributed to them. The src error takes ownership of the /// child error. +/// +/// Edge Conditions - +/// 1) If either of \a src or \a child is GRPC_ERROR_NONE, returns a reference +/// to the other argument. 2) If both \a src and \a child are GRPC_ERROR_NONE, +/// returns GRPC_ERROR_NONE. 3) If \a src and \a child point to the same error, +/// returns a single reference. (Note that, 2 references should have been +/// received to the error in this case.) grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; diff --git a/test/core/iomgr/error_test.cc b/test/core/iomgr/error_test.cc index a1628a1f71..d78a8c2af3 100644 --- a/test/core/iomgr/error_test.cc +++ b/test/core/iomgr/error_test.cc @@ -187,16 +187,6 @@ static void test_os_error() { GRPC_ERROR_UNREF(error); } -static void test_special() { - grpc_error* error = GRPC_ERROR_NONE; - error = grpc_error_add_child( - error, GRPC_ERROR_CREATE_FROM_STATIC_STRING("test child")); - intptr_t i; - GPR_ASSERT(grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &i)); - GPR_ASSERT(i == GRPC_STATUS_OK); - GRPC_ERROR_UNREF(error); -} - static void test_overflow() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Overflow"); @@ -235,7 +225,6 @@ int main(int argc, char** argv) { test_os_error(); test_create_referencing(); test_create_referencing_many(); - test_special(); test_overflow(); grpc_shutdown(); -- cgit v1.2.3 From 622f1a893466b1cef9e73a9616a5493c90d6ba01 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 21 Aug 2018 17:23:45 -0700 Subject: Reviewer comment --- src/core/ext/filters/message_size/message_size_filter.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index a5f5f8e2ff..c17df86f3d 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -107,7 +107,7 @@ struct call_data { // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; // Original recv_trailing_metadata callback, invoked after our own. - grpc_closure* next_recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_ready; }; struct channel_data { @@ -158,7 +158,7 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } // Start transport stream op. @@ -191,7 +191,7 @@ static void start_transport_stream_op_batch( } // Inject callback for receiving trailing metadata. if (op->recv_trailing_metadata) { - calld->next_recv_trailing_metadata_ready = + calld->original_recv_trailing_metadata_ready = op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = &calld->recv_trailing_metadata_ready; @@ -207,7 +207,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = static_cast(elem->call_data); calld->call_combiner = args->call_combiner; calld->next_recv_message_ready = nullptr; - calld->next_recv_trailing_metadata_ready = nullptr; + calld->original_recv_trailing_metadata_ready = nullptr; calld->error = GRPC_ERROR_NONE; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); -- cgit v1.2.3 From 986fa0ed58661e13b71e04fd9c6a62f9d84493f3 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 21 Aug 2018 18:55:05 -0700 Subject: reviewer comments --- src/core/lib/surface/call.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index d0f418220b..fb3161aeaa 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -693,12 +693,10 @@ static void set_final_status(grpc_call* call, grpc_error* error) { grpc_core::channelz::ChannelNode* channelz_channel = grpc_channel_get_channelz_node(call->channel); if (call->is_client) { - const char** error_string = call->final_op.client.error_string; - grpc_status_code code; grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr, - error_string); - *call->final_op.client.status = code; + grpc_error_get_status(error, call->send_deadline, + call->final_op.client.status, &slice, nullptr, + call->final_op.client.error_string); *call->final_op.client.status_details = grpc_slice_ref_internal(slice); call->status_error = error; if (channelz_channel != nullptr) { -- cgit v1.2.3 From 497532027dca770da60ac23e44f60ecf619cc2c5 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 23 Aug 2018 14:00:49 -0700 Subject: Reviewer comments --- src/core/lib/surface/call.cc | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index fb3161aeaa..0b86a74340 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -510,9 +510,8 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(c->status_error, c->send_deadline, - &c->final_info.final_status, &slice, nullptr, + &c->final_info.final_status, nullptr, nullptr, &(c->final_info.error_string)); GRPC_ERROR_UNREF(c->status_error); c->final_info.stats.latency = @@ -690,8 +689,6 @@ static void set_final_status(grpc_call* call, grpc_error* error) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } - grpc_core::channelz::ChannelNode* channelz_channel = - grpc_channel_get_channelz_node(call->channel); if (call->is_client) { grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(error, call->send_deadline, @@ -699,6 +696,8 @@ static void set_final_status(grpc_call* call, grpc_error* error) { call->final_op.client.error_string); *call->final_op.client.status_details = grpc_slice_ref_internal(slice); call->status_error = error; + grpc_core::channelz::ChannelNode* channelz_channel = + grpc_channel_get_channelz_node(call->channel); if (channelz_channel != nullptr) { if (*call->final_op.client.status != GRPC_STATUS_OK) { channelz_channel->RecordCallFailed(); @@ -709,13 +708,14 @@ static void set_final_status(grpc_call* call, grpc_error* error) { } else { *call->final_op.server.cancelled = error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; - if (channelz_channel != nullptr) { + /* TODO(ncteisen) : Update channelz handling for server + if (channelz_channel != nullptr) { if (*call->final_op.server.cancelled) { channelz_channel->RecordCallFailed(); } else { channelz_channel->RecordCallSucceeded(); } - } + } */ GRPC_ERROR_UNREF(error); } } @@ -992,7 +992,7 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b, grpc_error* batch_error) { grpc_call* call = static_cast(args); if (batch_error != GRPC_ERROR_NONE) { - set_final_status(call, GRPC_ERROR_REF(batch_error)); + set_final_status(call, batch_error); } else if (b->idx.named.grpc_status != nullptr) { grpc_status_code status_code = grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md); @@ -1025,7 +1025,6 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN)); } publish_app_metadata(call, b, true); - GRPC_ERROR_UNREF(batch_error); } gpr_arena* grpc_call_get_arena(grpc_call* call) { return call->arena; } -- cgit v1.2.3 From d60a4b0f6b0d0d92944385bc9b629bb12aea24af Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 24 Aug 2018 09:44:37 -0700 Subject: Reviewer comments --- src/core/lib/surface/call.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 0b86a74340..e568ff3db4 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -690,11 +690,12 @@ static void set_final_status(grpc_call* call, grpc_error* error) { gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); } if (call->is_client) { - grpc_slice slice = grpc_empty_slice(); grpc_error_get_status(error, call->send_deadline, - call->final_op.client.status, &slice, nullptr, + call->final_op.client.status, + call->final_op.client.status_details, nullptr, call->final_op.client.error_string); - *call->final_op.client.status_details = grpc_slice_ref_internal(slice); + // explicitly take a ref + grpc_slice_ref_internal(*call->final_op.client.status_details); call->status_error = error; grpc_core::channelz::ChannelNode* channelz_channel = grpc_channel_get_channelz_node(call->channel); -- cgit v1.2.3 From 2529cfe30a54503246b94a3f0e4d7c3971edf8c3 Mon Sep 17 00:00:00 2001 From: Hope Casey-Allen Date: Sun, 26 Aug 2018 19:04:01 -0700 Subject: Light refactoring of some transport code --- .../transport/chttp2/transport/chttp2_transport.cc | 465 +++++++++++---------- src/core/ext/transport/chttp2/transport/internal.h | 2 + 2 files changed, 244 insertions(+), 223 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 027a57d606..f269c252c6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -230,35 +230,165 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); -static void init_transport(grpc_chttp2_transport* t, - const grpc_channel_args* channel_args, - grpc_endpoint* ep, bool is_client) { +/* Returns whether bdp is enabled */ +static bool read_channel_args(grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + bool is_client) { + bool enable_bdp = true; size_t i; int j; - GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == - GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - - t->base.vtable = get_vtable(); - t->ep = ep; - /* one ref is for destroy */ - gpr_ref_init(&t->refs, 1); - t->combiner = grpc_combiner_create(); - t->peer_string = grpc_endpoint_get_peer(ep); - t->endpoint_reading = 1; - t->next_stream_id = is_client ? 1 : 2; - t->is_client = is_client; - t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; - t->is_first_frame = true; - grpc_connectivity_state_init( - &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, - is_client ? "client_transport" : "server_transport"); - - grpc_slice_buffer_init(&t->qbuf); - - grpc_slice_buffer_init(&t->outbuf); - grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); + for (i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + if ((t->next_stream_id & 1) != (value & 1)) { + gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, + is_client ? "client" : "server"); + } else { + t->next_stream_id = static_cast(value); + } + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + grpc_chttp2_hpack_compressor_set_max_usable_size( + &t->hpack_compressor, static_cast(value)); + } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { + t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( + &channel_args->args[i], + {g_default_max_pings_without_data, 0, INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { + t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( + &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_sent_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_sent_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == + strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_recv_ping_interval_without_data = + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_recv_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { + t->write_buffer_size = static_cast(grpc_channel_arg_get_integer( + &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { + enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); + t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{t->is_client + ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); + t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = static_cast( + grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } + } else { + static const struct { + const char* channel_arg_name; + grpc_chttp2_setting_id setting_id; + grpc_integer_options integer_options; + bool availability[2] /* server, client */; + } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + {-1, 0, INT32_MAX}, + {true, false}}, + {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_MAX_METADATA_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + {-1, 16384, 16777215}, + {true, true}}, + {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, + GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, + {1, 0, 1}, + {true, true}}, + {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + {-1, 5, INT32_MAX}, + {true, true}}}; + for (j = 0; j < static_cast GPR_ARRAY_SIZE(settings_map); j++) { + if (0 == strcmp(channel_args->args[i].key, + settings_map[j].channel_arg_name)) { + if (!settings_map[j].availability[is_client]) { + gpr_log(GPR_DEBUG, "%s is not available on %s", + settings_map[j].channel_arg_name, + is_client ? "clients" : "servers"); + } else { + int value = grpc_channel_arg_get_integer( + &channel_args->args[i], settings_map[j].integer_options); + if (value >= 0) { + queue_setting_update(t, settings_map[j].setting_id, + static_cast(value)); + } + } + break; + } + } + } + } + return enable_bdp; +} +static void init_transport_closures(grpc_chttp2_transport* t) { GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -286,6 +416,79 @@ static void init_transport(grpc_chttp2_transport* t, GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); +} + +static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { + if (t->is_client) { + t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_time_ms; + t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_client_keepalive_permit_without_calls; + } else { + t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_time_ms; + t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_server_keepalive_permit_without_calls; + } +} + +static void configure_transport_ping_policy(grpc_chttp2_transport* t) { + t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; + t->ping_policy.min_sent_ping_interval_without_data = + g_default_min_sent_ping_interval_without_data_ms; + t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; + t->ping_policy.min_recv_ping_interval_without_data = + g_default_min_recv_ping_interval_without_data_ms; +} + +static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { + if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping_locked); + } else { + /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no + inflight keeaplive timers */ + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; + } +} + +static void init_transport(grpc_chttp2_transport* t, + const grpc_channel_args* channel_args, + grpc_endpoint* ep, bool is_client) { + GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == + GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); + + t->base.vtable = get_vtable(); + t->ep = ep; + /* one ref is for destroy */ + gpr_ref_init(&t->refs, 1); + t->combiner = grpc_combiner_create(); + t->peer_string = grpc_endpoint_get_peer(ep); + t->endpoint_reading = 1; + t->next_stream_id = is_client ? 1 : 2; + t->is_client = is_client; + t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; + t->is_first_frame = true; + grpc_connectivity_state_init( + &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, + is_client ? "client_transport" : "server_transport"); + + grpc_slice_buffer_init(&t->qbuf); + grpc_slice_buffer_init(&t->outbuf); + grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); + + init_transport_closures(t); t->goaway_error = GRPC_ERROR_NONE; grpc_chttp2_goaway_parser_init(&t->goaway_parser); @@ -301,6 +504,8 @@ static void init_transport(grpc_chttp2_transport* t, grpc_chttp2_stream_map_init(&t->stream_map, 8); /* copy in initial settings to all setting sets */ + size_t i; + int j; for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; @@ -328,191 +533,14 @@ static void init_transport(grpc_chttp2_transport* t, queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); - t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; - t->ping_policy.min_sent_ping_interval_without_data = - g_default_min_sent_ping_interval_without_data_ms; - t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; - t->ping_policy.min_recv_ping_interval_without_data = - g_default_min_recv_ping_interval_without_data_ms; - - /* Keepalive setting */ - if (t->is_client) { - t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_client_keepalive_time_ms; - t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_client_keepalive_timeout_ms; - t->keepalive_permit_without_calls = - g_default_client_keepalive_permit_without_calls; - } else { - t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_server_keepalive_time_ms; - t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX - ? GRPC_MILLIS_INF_FUTURE - : g_default_server_keepalive_timeout_ms; - t->keepalive_permit_without_calls = - g_default_server_keepalive_permit_without_calls; - } + configure_transport_ping_policy(t); + init_transport_keepalive_settings(t); t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; bool enable_bdp = true; - if (channel_args) { - for (i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - if ((t->next_stream_id & 1) != (value & 1)) { - gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, - t->next_stream_id & 1, is_client ? "client" : "server"); - } else { - t->next_stream_id = static_cast(value); - } - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { - const grpc_integer_options options = {-1, 0, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - grpc_chttp2_hpack_compressor_set_max_usable_size( - &t->hpack_compressor, static_cast(value)); - } - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { - t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( - &channel_args->args[i], - {g_default_max_pings_without_data, 0, INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { - t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( - &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_sent_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_sent_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == - strcmp( - channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_recv_ping_interval_without_data = - grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{ - g_default_min_recv_ping_interval_without_data_ms, 0, - INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { - t->write_buffer_size = - static_cast(grpc_channel_arg_get_integer( - &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE})); - } else if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIME_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_time_ms - : g_default_server_keepalive_time_ms, - 1, INT_MAX}); - t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { - const int value = grpc_channel_arg_get_integer( - &channel_args->args[i], - grpc_integer_options{t->is_client - ? g_default_client_keepalive_timeout_ms - : g_default_server_keepalive_timeout_ms, - 0, INT_MAX}); - t->keepalive_timeout = - value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { - t->keepalive_permit_without_calls = static_cast( - grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1})); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_OPTIMIZATION_TARGET)) { - if (channel_args->args[i].type != GRPC_ARG_STRING) { - gpr_log(GPR_ERROR, "%s should be a string", - GRPC_ARG_OPTIMIZATION_TARGET); - } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; - } else if (0 == - strcmp(channel_args->args[i].value.string, "throughput")) { - t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; - } else { - gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", - GRPC_ARG_OPTIMIZATION_TARGET, - channel_args->args[i].value.string); - } - } else { - static const struct { - const char* channel_arg_name; - grpc_chttp2_setting_id setting_id; - grpc_integer_options integer_options; - bool availability[2] /* server, client */; - } settings_map[] = { - {GRPC_ARG_MAX_CONCURRENT_STREAMS, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - {-1, 0, INT32_MAX}, - {true, false}}, - {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_MAX_METADATA_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - {-1, 0, INT32_MAX}, - {true, true}}, - {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - {-1, 16384, 16777215}, - {true, true}}, - {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, - GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, - {1, 0, 1}, - {true, true}}, - {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - {-1, 5, INT32_MAX}, - {true, true}}}; - for (j = 0; j < static_cast GPR_ARRAY_SIZE(settings_map); j++) { - if (0 == strcmp(channel_args->args[i].key, - settings_map[j].channel_arg_name)) { - if (!settings_map[j].availability[is_client]) { - gpr_log(GPR_DEBUG, "%s is not available on %s", - settings_map[j].channel_arg_name, - is_client ? "clients" : "servers"); - } else { - int value = grpc_channel_arg_get_integer( - &channel_args->args[i], settings_map[j].integer_options); - if (value >= 0) { - queue_setting_update(t, settings_map[j].setting_id, - static_cast(value)); - } - } - break; - } - } - } - } + enable_bdp = read_channel_args(t, channel_args, is_client); } if (g_flow_control_enabled) { @@ -531,23 +559,11 @@ static void init_transport(grpc_chttp2_transport* t, t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t->ping_recv_state.ping_strikes = 0; - /* Start keepalive pings */ - if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); - } else { - /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no - inflight keeaplive timers */ - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; - } + init_keepalive_pings_if_enabled(t); if (enable_bdp) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); schedule_bdp_ping_locked(t); - grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t, nullptr); } @@ -2887,17 +2903,20 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, } } +void Chttp2IncomingByteStream::EnsureStreamDecompressionCtxExists() { + if (!stream_->stream_decompression_ctx) { + stream_->stream_decompression_ctx = grpc_stream_compression_context_create( + stream_->stream_decompression_method); + } +} + grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); grpc_error* error; if (stream_->unprocessed_incoming_frames_buffer.length > 0) { if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - if (!stream_->stream_decompression_ctx) { - stream_->stream_decompression_ctx = - grpc_stream_compression_context_create( - stream_->stream_decompression_method); - } + EnsureStreamDecompressionCtxExists(); if (!grpc_stream_decompress(stream_->stream_decompression_ctx, &stream_->unprocessed_incoming_frames_buffer, &stream_->decompressed_data_buffer, nullptr, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ca6e715978..0f66faec31 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -246,6 +246,8 @@ class Chttp2IncomingByteStream : public ByteStream { static void NextLocked(void* arg, grpc_error* error_ignored); static void OrphanLocked(void* arg, grpc_error* error_ignored); + void EnsureStreamDecompressionCtxExists(); + grpc_chttp2_transport* transport_; // Immutable. grpc_chttp2_stream* stream_; // Immutable. -- cgit v1.2.3 From e3e1840efb3c491fc823c80dd92f41a53aa39143 Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Tue, 28 Aug 2018 14:36:20 -0700 Subject: Porting fix from CL- https://critique.corp.google.com/#review/210573936 --- src/core/ext/filters/max_age/max_age_filter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 1fe8288bd0..07ed417c95 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -430,7 +430,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, : DEFAULT_MAX_CONNECTION_IDLE_MS; chand->idle_state = MAX_IDLE_STATE_INIT; gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, - GRPC_MILLIS_INF_PAST); + GPR_ATM_MIN); for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_AGE_MS)) { -- cgit v1.2.3 From 12b9e0beb591c2718f950c044405c870828dac75 Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Tue, 28 Aug 2018 15:44:39 -0700 Subject: Fixed formatting --- src/core/ext/filters/max_age/max_age_filter.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 07ed417c95..431472609e 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -429,8 +429,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, ? GRPC_MILLIS_INF_FUTURE : DEFAULT_MAX_CONNECTION_IDLE_MS; chand->idle_state = MAX_IDLE_STATE_INIT; - gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, - GPR_ATM_MIN); + gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, GPR_ATM_MIN); for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_AGE_MS)) { -- cgit v1.2.3 From f8a4aae1197e586d5c34f1acff450eac4d764c4b Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 1 Aug 2018 08:36:03 -0700 Subject: Fix all instances of bugprone-undefined-memory-manipulation --- .clang-tidy | 4 ++-- src/core/ext/filters/client_channel/client_channel.cc | 1 - src/core/lib/surface/channel.cc | 1 - src/core/tsi/alts_transport_security.cc | 4 +++- test/.clang-tidy | 6 ++++++ test/core/end2end/fixtures/http_proxy_fixture.cc | 18 ++++++++++++------ test/core/end2end/fixtures/proxy.cc | 17 +++++++++++++---- test/cpp/util/.clang-tidy | 6 ------ 8 files changed, 36 insertions(+), 21 deletions(-) create mode 100644 test/.clang-tidy delete mode 100644 test/cpp/util/.clang-tidy (limited to 'src/core') diff --git a/.clang-tidy b/.clang-tidy index fbf0b6541e..d217441792 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -1,6 +1,6 @@ --- -Checks: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*' -WarningsAsErrors: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*' +Checks: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*' +WarningsAsErrors: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*' CheckOptions: - key: readability-function-size.StatementThreshold value: '450' diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index b06f09d8c7..b2275441e5 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -457,7 +457,6 @@ get_service_config_from_resolver_result_locked(channel_data* chand) { grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; service_config->ParseGlobalParams(parse_retry_throttle_params, diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 82635d3c21..e3dde51d69 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -100,7 +100,6 @@ grpc_channel* grpc_channel_create_with_builder( return channel; } - memset(channel, 0, sizeof(*channel)); channel->target = target; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); size_t channel_tracer_max_nodes = 0; // default to off diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc index 2fd408103b..dac23bbf7a 100644 --- a/src/core/tsi/alts_transport_security.cc +++ b/src/core/tsi/alts_transport_security.cc @@ -45,7 +45,9 @@ void grpc_tsi_alts_signal_for_cq_destroy() { } void grpc_tsi_alts_init() { - memset(&g_alts_resource, 0, sizeof(alts_shared_resource)); + g_alts_resource.channel = nullptr; + g_alts_resource.cq = nullptr; + g_alts_resource.is_cq_drained = false; gpr_mu_init(&g_alts_resource.mu); gpr_cv_init(&g_alts_resource.cv); } diff --git a/test/.clang-tidy b/test/.clang-tidy new file mode 100644 index 0000000000..f951ee5fea --- /dev/null +++ b/test/.clang-tidy @@ -0,0 +1,6 @@ +--- +Checks: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,-performance-unnecessary-copy-initialization' +WarningsAsErrors: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,-performance-unnecessary-copy-initialization' +CheckOptions: + - key: readability-function-size.StatementThreshold + value: '450' diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index f02fa9d998..6f444bf66a 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -52,6 +52,16 @@ #include "test/core/util/port.h" struct grpc_end2end_http_proxy { + grpc_end2end_http_proxy() + : proxy_name(nullptr), + server(nullptr), + channel_args(nullptr), + mu(nullptr), + pollset(nullptr), + combiner(nullptr) { + gpr_ref_init(&users, 1); + combiner = grpc_combiner_create(); + } char* proxy_name; grpc_core::Thread thd; grpc_tcp_server* server; @@ -519,11 +529,7 @@ static void thread_main(void* arg) { grpc_end2end_http_proxy* grpc_end2end_http_proxy_create( grpc_channel_args* args) { grpc_core::ExecCtx exec_ctx; - grpc_end2end_http_proxy* proxy = - static_cast(gpr_malloc(sizeof(*proxy))); - memset(proxy, 0, sizeof(*proxy)); - proxy->combiner = grpc_combiner_create(); - gpr_ref_init(&proxy->users, 1); + grpc_end2end_http_proxy* proxy = grpc_core::New(); // Construct proxy address. const int proxy_port = grpc_pick_unused_port_or_die(); gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port); @@ -573,7 +579,7 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { GRPC_CLOSURE_CREATE(destroy_pollset, proxy->pollset, grpc_schedule_on_exec_ctx)); GRPC_COMBINER_UNREF(proxy->combiner, "test"); - gpr_free(proxy); + grpc_core::Delete(proxy); } const char* grpc_end2end_http_proxy_get_proxy_name( diff --git a/test/core/end2end/fixtures/proxy.cc b/test/core/end2end/fixtures/proxy.cc index 042c858b4c..869b6e846d 100644 --- a/test/core/end2end/fixtures/proxy.cc +++ b/test/core/end2end/fixtures/proxy.cc @@ -30,6 +30,17 @@ #include "test/core/util/port.h" struct grpc_end2end_proxy { + grpc_end2end_proxy() + : proxy_port(nullptr), + server_port(nullptr), + cq(nullptr), + server(nullptr), + client(nullptr), + shutdown(false), + new_call(nullptr) { + memset(&new_call_details, 0, sizeof(new_call_details)); + memset(&new_call_metadata, 0, sizeof(new_call_metadata)); + } grpc_core::Thread thd; char* proxy_port; char* server_port; @@ -79,9 +90,7 @@ grpc_end2end_proxy* grpc_end2end_proxy_create(const grpc_end2end_proxy_def* def, int proxy_port = grpc_pick_unused_port_or_die(); int server_port = grpc_pick_unused_port_or_die(); - grpc_end2end_proxy* proxy = - static_cast(gpr_malloc(sizeof(*proxy))); - memset(proxy, 0, sizeof(*proxy)); + grpc_end2end_proxy* proxy = grpc_core::New(); gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port); gpr_join_host_port(&proxy->server_port, "localhost", server_port); @@ -128,7 +137,7 @@ void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) { grpc_channel_destroy(proxy->client); grpc_completion_queue_destroy(proxy->cq); grpc_call_details_destroy(&proxy->new_call_details); - gpr_free(proxy); + grpc_core::Delete(proxy); } static void unrefpc(proxy_call* pc, const char* reason) { diff --git a/test/cpp/util/.clang-tidy b/test/cpp/util/.clang-tidy deleted file mode 100644 index f951ee5fea..0000000000 --- a/test/cpp/util/.clang-tidy +++ /dev/null @@ -1,6 +0,0 @@ ---- -Checks: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,-performance-unnecessary-copy-initialization' -WarningsAsErrors: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,-performance-unnecessary-copy-initialization' -CheckOptions: - - key: readability-function-size.StatementThreshold - value: '450' -- cgit v1.2.3 From d68e8b4f6267a1aa16bf5c8253f39b1be6f47188 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 31 Aug 2018 12:12:35 -0700 Subject: %s/state_op_done[OP_FAILED]/state_callback_received[OP_FAILED] --- src/core/ext/transport/cronet/transport/cronet_transport.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 4a252d972d..81e2634e3a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -1287,7 +1287,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_error* error = GRPC_ERROR_NONE; if (stream_state->state_op_done[OP_CANCEL_ERROR]) { error = GRPC_ERROR_REF(stream_state->cancel_error); - } else if (stream_state->state_op_done[OP_FAILED]) { + } else if (stream_state->state_callback_received[OP_FAILED]) { error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."); } else if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( -- cgit v1.2.3 From 0b59c106d7d225995c2adb59c5c945f7b5319613 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 31 Aug 2018 14:51:39 -0700 Subject: use grpc_slice_unref_internal inside grpc --- src/core/tsi/alts/handshaker/alts_handshaker_client.cc | 9 +++++---- .../tsi/alts/handshaker/alts_handshaker_service_api_util.cc | 4 +++- src/core/tsi/alts/handshaker/alts_tsi_event.cc | 6 ++++-- src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc | 11 ++++++----- src/core/tsi/alts/handshaker/alts_tsi_utils.cc | 4 +++- .../alts_grpc_privacy_integrity_record_protocol.cc | 4 ++-- src/core/tsi/ssl/session_cache/ssl_session_cache.cc | 3 ++- 7 files changed, 25 insertions(+), 16 deletions(-) (limited to 'src/core') diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc index b5268add0d..17e8026096 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc @@ -24,6 +24,7 @@ #include #include +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api.h" const int kHandshakerClientOpNum = 4; @@ -109,7 +110,7 @@ static grpc_byte_buffer* get_serialized_start_client(alts_tsi_event* event) { if (ok) { buffer = grpc_raw_byte_buffer_create(&slice, 1 /* number of slices */); } - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); gpr_free(target_name); grpc_gcp_handshaker_req_destroy(req); return buffer; @@ -157,7 +158,7 @@ static grpc_byte_buffer* get_serialized_start_server( if (ok) { buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */); } - grpc_slice_unref(req_slice); + grpc_slice_unref_internal(req_slice); grpc_gcp_handshaker_req_destroy(req); return buffer; } @@ -195,7 +196,7 @@ static grpc_byte_buffer* get_serialized_next(grpc_slice* bytes_received) { if (ok) { buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */); } - grpc_slice_unref(req_slice); + grpc_slice_unref_internal(req_slice); grpc_gcp_handshaker_req_destroy(req); return buffer; } @@ -258,7 +259,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create( grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); client->base.vtable = &vtable; - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); return &client->base; } diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc index e0e4184686..d63d3538c5 100644 --- a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc +++ b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc @@ -20,6 +20,8 @@ #include "src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.h" +#include "src/core/lib/slice/slice_internal.h" + void add_repeated_field(repeated_field** head, const void* data) { repeated_field* field = static_cast(gpr_zalloc(sizeof(*field))); @@ -67,7 +69,7 @@ grpc_slice* create_slice(const char* data, size_t size) { void destroy_slice(grpc_slice* slice) { if (slice != nullptr) { - grpc_slice_unref(*slice); + grpc_slice_unref_internal(*slice); gpr_free(slice); } } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_event.cc b/src/core/tsi/alts/handshaker/alts_tsi_event.cc index ec0bf12b95..cb36d5ebd1 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_event.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_event.cc @@ -24,6 +24,8 @@ #include #include +#include "src/core/lib/slice/slice_internal.h" + tsi_result alts_tsi_event_create(alts_tsi_handshaker* handshaker, tsi_handshaker_on_next_done_cb cb, void* user_data, @@ -66,8 +68,8 @@ void alts_tsi_event_destroy(alts_tsi_event* event) { grpc_byte_buffer_destroy(event->recv_buffer); grpc_metadata_array_destroy(&event->initial_metadata); grpc_metadata_array_destroy(&event->trailing_metadata); - grpc_slice_unref(event->details); - grpc_slice_unref(event->target_name); + grpc_slice_unref_internal(event->details); + grpc_slice_unref_internal(event->target_name); grpc_alts_credentials_options_destroy(event->options); gpr_free(event); } diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index 1df1021bb1..34608a3de1 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -31,6 +31,7 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/alts/frame_protector/alts_frame_protector.h" #include "src/core/tsi/alts/handshaker/alts_handshaker_client.h" #include "src/core/tsi/alts/handshaker/alts_tsi_utils.h" @@ -182,7 +183,7 @@ static void handshaker_result_destroy(tsi_handshaker_result* self) { gpr_free(result->peer_identity); gpr_free(result->key_data); gpr_free(result->unused_bytes); - grpc_slice_unref(result->rpc_versions); + grpc_slice_unref_internal(result->rpc_versions); gpr_free(result); } @@ -269,12 +270,12 @@ static tsi_result handshaker_next( handshaker->has_sent_start_message = true; } else { if (!GRPC_SLICE_IS_EMPTY(handshaker->recv_bytes)) { - grpc_slice_unref(handshaker->recv_bytes); + grpc_slice_unref_internal(handshaker->recv_bytes); } handshaker->recv_bytes = grpc_slice_ref(slice); ok = alts_handshaker_client_next(handshaker->client, event, &slice); } - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); if (ok != TSI_OK) { gpr_log(GPR_ERROR, "Failed to schedule ALTS handshaker requests"); return ok; @@ -299,8 +300,8 @@ static void handshaker_destroy(tsi_handshaker* self) { alts_tsi_handshaker* handshaker = reinterpret_cast(self); alts_handshaker_client_destroy(handshaker->client); - grpc_slice_unref(handshaker->recv_bytes); - grpc_slice_unref(handshaker->target_name); + grpc_slice_unref_internal(handshaker->recv_bytes); + grpc_slice_unref_internal(handshaker->target_name); grpc_alts_credentials_options_destroy(handshaker->options); gpr_free(handshaker->buffer); gpr_free(handshaker); diff --git a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc index d9b5e6c945..1747f1ad04 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc @@ -22,6 +22,8 @@ #include +#include "src/core/lib/slice/slice_internal.h" + tsi_result alts_tsi_utils_convert_to_tsi_result(grpc_status_code code) { switch (code) { case GRPC_STATUS_OK: @@ -47,7 +49,7 @@ grpc_gcp_handshaker_resp* alts_tsi_utils_deserialize_response( grpc_slice slice = grpc_byte_buffer_reader_readall(&bbr); grpc_gcp_handshaker_resp* resp = grpc_gcp_handshaker_resp_create(); bool ok = grpc_gcp_handshaker_resp_decode(slice, resp); - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); grpc_byte_buffer_reader_destroy(&bbr); if (!ok) { grpc_gcp_handshaker_resp_destroy(resp); diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc index d4fd88d1e2..e7890903d5 100644 --- a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc +++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc @@ -61,7 +61,7 @@ static tsi_result alts_grpc_privacy_integrity_protect( if (status != GRPC_STATUS_OK) { gpr_log(GPR_ERROR, "Failed to protect, %s", error_details); gpr_free(error_details); - grpc_slice_unref(protected_slice); + grpc_slice_unref_internal(protected_slice); return TSI_INTERNAL_ERROR; } grpc_slice_buffer_add(protected_slices, protected_slice); @@ -106,7 +106,7 @@ static tsi_result alts_grpc_privacy_integrity_unprotect( if (status != GRPC_STATUS_OK) { gpr_log(GPR_ERROR, "Failed to unprotect, %s", error_details); gpr_free(error_details); - grpc_slice_unref(unprotected_slice); + grpc_slice_unref_internal(unprotected_slice); return TSI_INTERNAL_ERROR; } grpc_slice_buffer_reset_and_unref_internal(&rp->header_sb); diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc index ce74fde343..f9184bcc34 100644 --- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc +++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc @@ -19,6 +19,7 @@ #include #include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/ssl/session_cache/ssl_session.h" #include "src/core/tsi/ssl/session_cache/ssl_session_cache.h" @@ -53,7 +54,7 @@ class SslSessionLRUCache::Node { SetSession(std::move(session)); } - ~Node() { grpc_slice_unref(key_); } + ~Node() { grpc_slice_unref_internal(key_); } // Not copyable nor movable. Node(const Node&) = delete; -- cgit v1.2.3 From 85721e22b40323c767cdcaf0f1be6c12e1755d9d Mon Sep 17 00:00:00 2001 From: Hope Casey-Allen Date: Fri, 31 Aug 2018 12:54:55 -0700 Subject: Change method name for consistency --- src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 4 ++-- src/core/ext/transport/chttp2/transport/internal.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index f269c252c6..26cad2cc9a 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -2903,7 +2903,7 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, } } -void Chttp2IncomingByteStream::EnsureStreamDecompressionCtxExists() { +void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() { if (!stream_->stream_decompression_ctx) { stream_->stream_decompression_ctx = grpc_stream_compression_context_create( stream_->stream_decompression_method); @@ -2916,7 +2916,7 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { if (stream_->unprocessed_incoming_frames_buffer.length > 0) { if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - EnsureStreamDecompressionCtxExists(); + MaybeCreateStreamDecompressionCtx(); if (!grpc_stream_decompress(stream_->stream_decompression_ctx, &stream_->unprocessed_incoming_frames_buffer, &stream_->decompressed_data_buffer, nullptr, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0f66faec31..6b5309bab4 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -246,7 +246,7 @@ class Chttp2IncomingByteStream : public ByteStream { static void NextLocked(void* arg, grpc_error* error_ignored); static void OrphanLocked(void* arg, grpc_error* error_ignored); - void EnsureStreamDecompressionCtxExists(); + void MaybeCreateStreamDecompressionCtx(); grpc_chttp2_transport* transport_; // Immutable. grpc_chttp2_stream* stream_; // Immutable. -- cgit v1.2.3 From e361d0f6910db66e081d125e45e58b96e22d3e11 Mon Sep 17 00:00:00 2001 From: jiangtaoli2016 Date: Fri, 31 Aug 2018 11:30:28 -0700 Subject: Turn loading system root certificate as default --- .../lib/security/security_connector/security_connector.cc | 14 +++++++------- test/core/security/linux_system_roots_test.cc | 6 ------ test/core/security/security_connector_test.cc | 1 + 3 files changed, 8 insertions(+), 13 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index 04b4c87c71..6246613e7b 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -59,8 +59,8 @@ static const char* installed_roots_path = /** Environment variable used as a flag to enable/disable loading system root certificates from the OS trust store. */ -#ifndef GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR -#define GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_USE_SYSTEM_SSL_ROOTS" +#ifndef GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR +#define GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_NOT_USE_SYSTEM_SSL_ROOTS" #endif #ifndef TSI_OPENSSL_ALPN_SUPPORT @@ -1192,10 +1192,10 @@ const char* DefaultSslRootStore::GetPemRootCerts() { grpc_slice DefaultSslRootStore::ComputePemRootCerts() { grpc_slice result = grpc_empty_slice(); - char* use_system_roots_env_value = - gpr_getenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR); - const bool use_system_roots = gpr_is_true(use_system_roots_env_value); - gpr_free(use_system_roots_env_value); + char* not_use_system_roots_env_value = + gpr_getenv(GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR); + const bool not_use_system_roots = gpr_is_true(not_use_system_roots_env_value); + gpr_free(not_use_system_roots_env_value); // First try to load the roots from the environment. char* default_root_certs_path = gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR); @@ -1218,7 +1218,7 @@ grpc_slice DefaultSslRootStore::ComputePemRootCerts() { gpr_free(pem_root_certs); } // Try loading roots from OS trust store if flag is enabled. - if (GRPC_SLICE_IS_EMPTY(result) && use_system_roots) { + if (GRPC_SLICE_IS_EMPTY(result) && !not_use_system_roots) { result = LoadSystemRootCerts(); } // Fallback to roots manually shipped with gRPC. diff --git a/test/core/security/linux_system_roots_test.cc b/test/core/security/linux_system_roots_test.cc index fce9c8dcc5..24d446de35 100644 --- a/test/core/security/linux_system_roots_test.cc +++ b/test/core/security/linux_system_roots_test.cc @@ -41,10 +41,6 @@ #include "gtest/gtest.h" -#ifndef GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR -#define GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_USE_SYSTEM_SSL_ROOTS" -#endif - namespace grpc { namespace { @@ -68,7 +64,6 @@ TEST(CreateRootCertsBundleTest, ReturnsEmpty) { } TEST(CreateRootCertsBundleTest, BundlesCorrectly) { - gpr_setenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR, "true"); // Test that CreateRootCertsBundle returns a correct slice. grpc_slice roots_bundle = grpc_empty_slice(); GRPC_LOG_IF_ERROR( @@ -81,7 +76,6 @@ TEST(CreateRootCertsBundleTest, BundlesCorrectly) { char* bundle_str = grpc_slice_to_c_string(roots_bundle); EXPECT_STREQ(result_str, bundle_str); // Clean up. - unsetenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR); gpr_free(result_str); gpr_free(bundle_str); grpc_slice_unref(roots_bundle); diff --git a/test/core/security/security_connector_test.cc b/test/core/security/security_connector_test.cc index 82d77eef8b..9dd37b975b 100644 --- a/test/core/security/security_connector_test.cc +++ b/test/core/security/security_connector_test.cc @@ -415,6 +415,7 @@ static void test_default_ssl_roots(void) { /* Now setup a permanent failure for the overridden roots and we should get an empty slice. */ + gpr_setenv("GRPC_NOT_USE_SYSTEM_SSL_ROOTS", "true"); grpc_set_ssl_roots_override_callback(override_roots_permanent_failure); roots = grpc_core::TestDefaultSslRootStore::ComputePemRootCertsForTesting(); GPR_ASSERT(GRPC_SLICE_IS_EMPTY(roots)); -- cgit v1.2.3 From dd95194a086b81966fd94726c04f53d279e247d8 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 4 Sep 2018 16:26:14 -0700 Subject: Prefer grpc status over http status test --- src/core/ext/transport/chttp2/transport/parsing.cc | 2 ++ src/core/lib/surface/call.cc | 4 ++-- test/core/end2end/tests/filter_status_code.cc | 20 +++++++++++++++++++- 3 files changed, 23 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 1e491d2ef8..205fb8c370 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -393,6 +393,7 @@ error_handler: static void free_timeout(void* p) { gpr_free(p); } static void on_initial_header(void* tp, grpc_mdelem md) { + gpr_log(GPR_INFO, "on initial header"); GPR_TIMER_SCOPE("on_initial_header", 0); grpc_chttp2_transport* t = static_cast(tp); @@ -475,6 +476,7 @@ static void on_initial_header(void* tp, grpc_mdelem md) { } static void on_trailing_header(void* tp, grpc_mdelem md) { + gpr_log(GPR_INFO, "on_trailing_header"); GPR_TIMER_SCOPE("on_trailing_header", 0); grpc_chttp2_transport* t = static_cast(tp); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index b07c4d6c10..496de1150a 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -685,10 +685,10 @@ static void cancel_with_status(grpc_call* c, grpc_status_code status, } static void set_final_status(grpc_call* call, grpc_error* error) { - if (grpc_call_error_trace.enabled()) { + //if (grpc_call_error_trace.enabled()) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); - } + //} if (call->is_client) { grpc_error_get_status(error, call->send_deadline, call->final_op.client.status, diff --git a/test/core/end2end/tests/filter_status_code.cc b/test/core/end2end/tests/filter_status_code.cc index ba3cbfa6d1..447ff520ee 100644 --- a/test/core/end2end/tests/filter_status_code.cc +++ b/test/core/end2end/tests/filter_status_code.cc @@ -249,6 +249,24 @@ typedef struct final_status_data { grpc_call_stack* call; } final_status_data; +static void start_transport_stream_op_batch(grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + auto* data = static_cast(elem->call_data); + if(data->call == g_server_call_stack) { + gpr_log(GPR_INFO, "here"); + } + if(op->send_initial_metadata) { + auto *batch = op->payload->send_initial_metadata.send_initial_metadata; + gpr_log(GPR_INFO, "init %p %p", batch->idx.named.status, batch->idx.named.grpc_status); + grpc_metadata_batch_substitute(batch, batch->idx.named.status, GRPC_MDELEM_STATUS_404); + } + if(op->send_trailing_metadata) { + auto *batch = op->payload->send_trailing_metadata.send_trailing_metadata; + gpr_log(GPR_INFO, "trai %p %p", batch->idx.named.status, batch->idx.named.grpc_status); + } + grpc_call_next_op(elem, op); +} + static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { final_status_data* data = static_cast(elem->call_data); @@ -307,7 +325,7 @@ static const grpc_channel_filter test_client_filter = { "client_filter_status_code"}; static const grpc_channel_filter test_server_filter = { - grpc_call_next_op, + start_transport_stream_op_batch, grpc_channel_next_op, sizeof(final_status_data), init_call_elem, -- cgit v1.2.3 From 3a41245e465e176dc2cae642cf701f5b476188b6 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 4 Sep 2018 19:18:15 -0700 Subject: Rectify the condition and add a test --- .../ext/filters/http/client/http_client_filter.cc | 7 ++++- src/core/ext/transport/chttp2/transport/parsing.cc | 2 -- src/core/lib/surface/call.cc | 4 +-- test/core/end2end/tests/filter_status_code.cc | 34 +++++++++++++--------- 4 files changed, 28 insertions(+), 19 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index f44dc032a7..91fa163fec 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -79,7 +79,12 @@ struct channel_data { static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { if (b->idx.named.status != nullptr) { - if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { + /* If both gRPC status and HTTP status are provided in the response, we + * should prefer the gRPC status code, as mentioned in + * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + */ + if (b->idx.named.grpc_status != nullptr || + grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { grpc_metadata_batch_remove(b, b->idx.named.status); } else { char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md), diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 205fb8c370..1e491d2ef8 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -393,7 +393,6 @@ error_handler: static void free_timeout(void* p) { gpr_free(p); } static void on_initial_header(void* tp, grpc_mdelem md) { - gpr_log(GPR_INFO, "on initial header"); GPR_TIMER_SCOPE("on_initial_header", 0); grpc_chttp2_transport* t = static_cast(tp); @@ -476,7 +475,6 @@ static void on_initial_header(void* tp, grpc_mdelem md) { } static void on_trailing_header(void* tp, grpc_mdelem md) { - gpr_log(GPR_INFO, "on_trailing_header"); GPR_TIMER_SCOPE("on_trailing_header", 0); grpc_chttp2_transport* t = static_cast(tp); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 496de1150a..b07c4d6c10 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -685,10 +685,10 @@ static void cancel_with_status(grpc_call* c, grpc_status_code status, } static void set_final_status(grpc_call* call, grpc_error* error) { - //if (grpc_call_error_trace.enabled()) { + if (grpc_call_error_trace.enabled()) { gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR"); gpr_log(GPR_DEBUG, "%s", grpc_error_string(error)); - //} + } if (call->is_client) { grpc_error_get_status(error, call->send_deadline, call->final_op.client.status, diff --git a/test/core/end2end/tests/filter_status_code.cc b/test/core/end2end/tests/filter_status_code.cc index 447ff520ee..5ffc3d00a3 100644 --- a/test/core/end2end/tests/filter_status_code.cc +++ b/test/core/end2end/tests/filter_status_code.cc @@ -16,6 +16,14 @@ * */ +/* This test verifies - + * 1) grpc_call_final_info passed to the filters on destroying a call contains + * the proper status. + * 2) If the response has both an HTTP status code and a gRPC status code, then + * we should prefer the gRPC status code as mentioned in + * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md + */ + #include "test/core/end2end/end2end_tests.h" #include @@ -249,20 +257,18 @@ typedef struct final_status_data { grpc_call_stack* call; } final_status_data; -static void start_transport_stream_op_batch(grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static void server_start_transport_stream_op_batch( + grpc_call_element* elem, grpc_transport_stream_op_batch* op) { auto* data = static_cast(elem->call_data); - if(data->call == g_server_call_stack) { - gpr_log(GPR_INFO, "here"); - } - if(op->send_initial_metadata) { - auto *batch = op->payload->send_initial_metadata.send_initial_metadata; - gpr_log(GPR_INFO, "init %p %p", batch->idx.named.status, batch->idx.named.grpc_status); - grpc_metadata_batch_substitute(batch, batch->idx.named.status, GRPC_MDELEM_STATUS_404); - } - if(op->send_trailing_metadata) { - auto *batch = op->payload->send_trailing_metadata.send_trailing_metadata; - gpr_log(GPR_INFO, "trai %p %p", batch->idx.named.status, batch->idx.named.grpc_status); + if (data->call == g_server_call_stack) { + if (op->send_initial_metadata) { + auto* batch = op->payload->send_initial_metadata.send_initial_metadata; + if (batch->idx.named.status != nullptr) { + /* Replace the HTTP status with 404 */ + grpc_metadata_batch_substitute(batch, batch->idx.named.status, + GRPC_MDELEM_STATUS_404); + } + } } grpc_call_next_op(elem, op); } @@ -325,7 +331,7 @@ static const grpc_channel_filter test_client_filter = { "client_filter_status_code"}; static const grpc_channel_filter test_server_filter = { - start_transport_stream_op_batch, + server_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(final_status_data), init_call_elem, -- cgit v1.2.3 From be8844bcdb704cff6a70507f5093e4bb26320ea3 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Fri, 7 Sep 2018 15:44:59 -0700 Subject: reviewer feedback --- .../ext/filters/client_channel/client_channel.cc | 133 ++++++++++----------- src/core/lib/channel/channelz.h | 7 +- src/core/lib/channel/connected_channel.cc | 6 +- src/core/lib/surface/call.cc | 8 +- test/core/channel/channel_trace_test.cc | 4 +- test/core/channel/channelz_test.cc | 4 +- 6 files changed, 78 insertions(+), 84 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d015ceb335..000cf82c6c 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -937,7 +937,7 @@ typedef struct client_channel_call_data { // state needed to support channelz interception of recv trailing metadata. grpc_closure recv_trailing_metadata_ready_channelz; grpc_closure* original_recv_trailing_metadata; - grpc_transport_stream_op_batch* recv_trailing_metadata_batch; + grpc_metadata_batch* recv_trailing_metadata_batch; grpc_polling_entity* pollent; bool pollent_added_to_interested_parties; @@ -1000,14 +1000,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); static void start_pick_locked(void* arg, grpc_error* ignored); -template -static pending_batch* pending_batch_find(grpc_call_element* elem, - const char* log_message, - Predicate predicate); -static void get_call_status(grpc_call_element* elem, - grpc_metadata_batch* md_batch, grpc_error* error, - grpc_status_code* status, - grpc_mdelem** server_pushback_md); +static void maybe_intercept_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); // // send op data caching @@ -1282,66 +1276,6 @@ static void resume_pending_batch_in_call_combiner(void* arg, grpc_subchannel_call_process_op(subchannel_call, batch); } -static void recv_trailing_metadata_ready_channelz(void* arg, - grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " - "error=%s", - chand, calld, grpc_error_string(error)); - } - GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr); - grpc_status_code status = GRPC_STATUS_OK; - grpc_metadata_batch* md_batch = - calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata - .recv_trailing_metadata; - get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); - grpc_core::channelz::SubchannelNode* channelz_subchannel = - calld->pick.connected_subchannel->channelz_subchannel(); - GPR_ASSERT(channelz_subchannel != nullptr); - if (status == GRPC_STATUS_OK) { - channelz_subchannel->RecordCallSucceeded(); - } else { - channelz_subchannel->RecordCallFailed(); - } - calld->recv_trailing_metadata_batch = nullptr; - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); -} - -// If channelz is enabled, intercept recv_trailing so that we may check the -// status and associate it to a subchannel. -// Returns true if callback was intercepted, false otherwise. -static void maybe_intercept_recv_trailing_for_channelz( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - call_data* calld = static_cast(elem->call_data); - // only intercept payloads with recv trailing. - if (!batch->recv_trailing_metadata) { - return; - } - // only add interceptor is channelz is enabled. - if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { - return; - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "calld=%p batch=%p: intercepting recv trailing for channelz", calld, - batch); - } - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, - recv_trailing_metadata_ready_channelz, elem, - grpc_schedule_on_exec_ctx); - // save some state needed for the interception callback. - GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr); - calld->recv_trailing_metadata_batch = batch; - calld->original_recv_trailing_metadata = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready_channelz; -} - // This is called via the call combiner, so access to calld is synchronized. static void pending_batches_resume(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); @@ -1366,7 +1300,7 @@ static void pending_batches_resume(grpc_call_element* elem) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - maybe_intercept_recv_trailing_for_channelz(elem, batch); + maybe_intercept_metadata_for_channelz(elem, batch); batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, @@ -2736,6 +2670,65 @@ static void pick_done(void* arg, grpc_error* error) { } } +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_call_element* elem = static_cast(arg); + channel_data* chand = static_cast(elem->channel_data); + call_data* calld = static_cast(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " + "error=%s", + chand, calld, grpc_error_string(error)); + } + GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = calld->recv_trailing_metadata_batch; + get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + calld->pick.connected_subchannel->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + calld->recv_trailing_metadata_batch = nullptr; + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +// Returns true if callback was intercepted, false otherwise. +static void maybe_intercept_metadata_for_channelz( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast(elem->call_data); + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { + return; + } + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "calld=%p batch=%p: intercepting recv trailing for channelz", calld, + batch); + } + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, elem, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr); + calld->recv_trailing_metadata_batch = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->recv_trailing_metadata_ready_channelz; +} + static void maybe_add_call_to_channel_interested_parties_locked( grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index bd2735929c..db5d05140d 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -79,7 +79,7 @@ class BaseNode : public RefCounted { const intptr_t uuid_; }; -// This class is a helper class for channelz entities that deal with Channels +// This class is a helper class for channelz entities that deal with Channels, // Subchannels, and Servers, since those have similar proto definitions. // This class has the ability to: // - track calls_{started,succeeded,failed} @@ -133,6 +133,9 @@ class ChannelNode : public BaseNode { // so it leaves these implementations blank. // // This is utilizing the template method design pattern. + // + // TODO(ncteisen): remove these template methods in favor of manual traversal + // and mutation of the grpc_json object. virtual void PopulateConnectivityState(grpc_json* json) {} virtual void PopulateChildRefs(grpc_json* json) {} @@ -158,7 +161,7 @@ class ChannelNode : public BaseNode { void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - // to allow the channel trace test to access trace(); + // to allow the channel trace test to access trace_. friend class testing::ChannelNodePeer; grpc_channel* channel_ = nullptr; UniquePtr target_; diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 90a0254663..4a4f0e49d0 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -104,18 +104,18 @@ static void con_start_transport_stream_op_batch( if (batch->recv_initial_metadata) { callback_state* state = &calld->recv_initial_metadata_ready; intercept_callback( - calld, state, false, "connected_recv_initial_metadata_ready", + calld, state, false, "recv_initial_metadata_ready", &batch->payload->recv_initial_metadata.recv_initial_metadata_ready); } if (batch->recv_message) { callback_state* state = &calld->recv_message_ready; - intercept_callback(calld, state, false, "connected_recv_message_ready", + intercept_callback(calld, state, false, "recv_message_ready", &batch->payload->recv_message.recv_message_ready); } if (batch->recv_trailing_metadata) { callback_state* state = &calld->recv_trailing_metadata_ready; intercept_callback( - calld, state, false, "connected_recv_trailing_metadata_ready", + calld, state, false, "recv_trailing_metadata_ready", &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); } if (batch->cancel_stream) { diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 3d69db4f83..eb7e67233b 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1425,7 +1425,7 @@ static void receiving_stream_ready_in_call_combiner(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_recv_message_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready"); receiving_stream_ready(bctlp, error); } @@ -1510,8 +1510,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, - "call_recv_initial_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { @@ -1562,8 +1561,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, - "call_recv_trailing_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); add_batch_error(bctl, GRPC_ERROR_REF(error), false); grpc_metadata_batch* md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; diff --git a/test/core/channel/channel_trace_test.cc b/test/core/channel/channel_trace_test.cc index e33277753b..d594445bfb 100644 --- a/test/core/channel/channel_trace_test.cc +++ b/test/core/channel/channel_trace_test.cc @@ -44,8 +44,8 @@ namespace testing { // testing peer to access channel internals class ChannelNodePeer { public: - ChannelNodePeer(ChannelNode* node) : node_(node) {} - ChannelTrace* trace() { return &node_->trace_; } + explicit ChannelNodePeer(ChannelNode* node) : node_(node) {} + ChannelTrace* trace() const { return &node_->trace_; } private: ChannelNode* node_; diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index 8fa46a18da..09189fa36d 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -46,8 +46,8 @@ namespace testing { // testing peer to access channel internals class CallCountingHelperPeer { public: - CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {} - grpc_millis last_call_started_millis() { + explicit CallCountingHelperPeer(CallCountingHelper* node) : node_(node) {} + grpc_millis last_call_started_millis() const { return (grpc_millis)gpr_atm_no_barrier_load( &node_->last_call_started_millis_); } -- cgit v1.2.3